Commit 6523c96a authored by zhengyaoqiu's avatar zhengyaoqiu

feat(colorway): 数据库 缓存模块

parent 136f12ab
package cache
import (
"context"
"encoding/json"
model "git.chillcy.com/golang/colorway/internal/pkg/database/mongo/model/base"
"go.mongodb.org/mongo-driver/bson"
primitive "go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"log"
"sync"
)
type Cache[T any] interface {
Filter(data ChangeStreamData) bson.M
DocumentEqual(a, b T) bool
GetAllDocuments() []T
}
type Base[T any] struct {
baseModel model.Model[T]
documents []T
rwMutex *sync.RWMutex
}
func NewBase[T any](baseModel model.Model[T]) *Base[T] {
return &Base[T]{baseModel: baseModel}
}
func (receiver *Base[T]) StartCache(ctx context.Context, cache Cache[T]) error {
documents, _, err := receiver.baseModel.Find(ctx, bson.M{})
if err != nil {
return err
}
receiver.setAllDocuments(documents)
go func() {
for {
changeStream, err := receiver.baseModel.Watch(ctx, mongo.Pipeline{})
if err != nil {
log.Println(err)
continue
}
for changeStream.Next(context.TODO()) {
var changeStreamData ChangeStreamData
changeBody := changeStream.Current.String()
err = json.Unmarshal([]byte(changeBody), &changeStreamData)
if err != nil {
log.Println(err)
continue
}
filter := cache.Filter(changeStreamData)
document, err := receiver.baseModel.FindOne(ctx, filter)
if err != nil {
log.Println(err)
continue
}
documents := receiver.GetAllDocuments()
for i := range documents {
if cache.DocumentEqual(documents[i], document) {
documents[i] = document
}
}
receiver.setAllDocuments(documents)
}
err = changeStream.Close(context.TODO())
if err != nil {
log.Println(err)
continue
}
}
}()
return nil
}
func (receiver *Base[T]) GetAllDocuments() []T {
receiver.rwMutex.RLock()
defer receiver.rwMutex.RUnlock()
tmp := make([]T, len(receiver.documents))
copy(tmp, receiver.documents)
return tmp
}
func (receiver *Base[T]) setAllDocuments(documents []T) {
receiver.rwMutex.Lock()
defer receiver.rwMutex.Unlock()
receiver.documents = documents
}
func (receiver *Base[T]) Filter(data ChangeStreamData) bson.M {
id, err := primitive.ObjectIDFromHex(data.DocumentKey.Id.Oid)
if err != nil {
return nil
}
return bson.M{"_id": id}
}
type ChangeStreamData struct {
DocumentKey struct {
Id struct {
Oid string `json:"$oid"`
} `json:"_id"`
} `json:"documentKey"`
}
...@@ -14,6 +14,7 @@ type Model[T any] interface { ...@@ -14,6 +14,7 @@ type Model[T any] interface {
UpdateOne(ctx context.Context, filter bson.M, update bson.D, updateOptions ...*options.UpdateOptions) (*mongo.UpdateResult, error) UpdateOne(ctx context.Context, filter bson.M, update bson.D, updateOptions ...*options.UpdateOptions) (*mongo.UpdateResult, error)
Traverse(ctx context.Context, filter bson.M, f func(document T) (bool, error), findOptions ...*options.FindOptions) error Traverse(ctx context.Context, filter bson.M, f func(document T) (bool, error), findOptions ...*options.FindOptions) error
BulkWrite(ctx context.Context, writeModels []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) BulkWrite(ctx context.Context, writeModels []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Watch(ctx context.Context, pipeline any, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
} }
type Base[T any] struct { type Base[T any] struct {
...@@ -70,3 +71,7 @@ func (receiver *Base[T]) Traverse(ctx context.Context, filter bson.M, f func(doc ...@@ -70,3 +71,7 @@ func (receiver *Base[T]) Traverse(ctx context.Context, filter bson.M, f func(doc
func (receiver *Base[T]) BulkWrite(ctx context.Context, writeModels []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) { func (receiver *Base[T]) BulkWrite(ctx context.Context, writeModels []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
return receiver.model.BulkWrite(ctx, writeModels, opts...) return receiver.model.BulkWrite(ctx, writeModels, opts...)
} }
func (receiver *Base[T]) Watch(ctx context.Context, pipeline any, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
return receiver.model.Watch(ctx, pipeline, opts...)
}
package model package model
import "github.com/zeromicro/go-zero/core/stores/mon" import (
"git.chillcy.com/golang/colorway/internal/pkg/database/mongo/cache"
"github.com/zeromicro/go-zero/core/stores/mon"
)
var _ SitePlatformPriceConfigModel = (*customSitePlatformPriceConfigModel)(nil) var _ SitePlatformPriceConfigModel = (*customSitePlatformPriceConfigModel)(nil)
...@@ -9,17 +12,25 @@ type ( ...@@ -9,17 +12,25 @@ type (
// and implement the added methods in customSitePlatformPriceConfigModel. // and implement the added methods in customSitePlatformPriceConfigModel.
SitePlatformPriceConfigModel interface { SitePlatformPriceConfigModel interface {
sitePlatformPriceConfigModel sitePlatformPriceConfigModel
cache.Cache[SitePlatformPriceConfig]
} }
customSitePlatformPriceConfigModel struct { customSitePlatformPriceConfigModel struct {
*defaultSitePlatformPriceConfigModel *defaultSitePlatformPriceConfigModel
*cache.Base[SitePlatformPriceConfig]
} }
) )
// NewSitePlatformPriceConfigModel returns a model for the mongo. // NewSitePlatformPriceConfigModel returns a model for the mongo.
func NewSitePlatformPriceConfigModel(url, db, collection string) SitePlatformPriceConfigModel { func NewSitePlatformPriceConfigModel(url, db, collection string) SitePlatformPriceConfigModel {
conn := mon.MustNewModel(url, db, collection) conn := mon.MustNewModel(url, db, collection)
defaultModel := newDefaultSitePlatformPriceConfigModel(conn)
return &customSitePlatformPriceConfigModel{ return &customSitePlatformPriceConfigModel{
defaultSitePlatformPriceConfigModel: newDefaultSitePlatformPriceConfigModel(conn), defaultSitePlatformPriceConfigModel: defaultModel,
Base: cache.NewBase[SitePlatformPriceConfig](defaultModel),
} }
} }
func (receiver *customSitePlatformPriceConfigModel) DocumentEqual(a, b SitePlatformPriceConfig) bool {
return a.ID.Hex() == b.ID.Hex()
}
...@@ -12,12 +12,12 @@ type sitePlatformPriceConfigModel interface { ...@@ -12,12 +12,12 @@ type sitePlatformPriceConfigModel interface {
type defaultSitePlatformPriceConfigModel struct { type defaultSitePlatformPriceConfigModel struct {
conn *mon.Model conn *mon.Model
model.Model[SitePlatformPriceConfig] *model.Base[SitePlatformPriceConfig]
} }
func newDefaultSitePlatformPriceConfigModel(conn *mon.Model) *defaultSitePlatformPriceConfigModel { func newDefaultSitePlatformPriceConfigModel(conn *mon.Model) *defaultSitePlatformPriceConfigModel {
return &defaultSitePlatformPriceConfigModel{ return &defaultSitePlatformPriceConfigModel{
conn: conn, conn: conn,
Model: model.NewBase[SitePlatformPriceConfig](conn), Base: model.NewBase[SitePlatformPriceConfig](conn),
} }
} }
package model package model
import ( import (
"git.chillcy.com/golang/colorway/internal/pkg/platform" "git.chillcy.com/golang/chillcy/pkg/currency"
"git.chillcy.com/golang/colorway/internal/pkg/store" "git.chillcy.com/golang/colorway/internal/pkg/database/mongo/types"
"time" "time"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
...@@ -11,14 +11,49 @@ import ( ...@@ -11,14 +11,49 @@ import (
type SitePlatformPriceConfig struct { type SitePlatformPriceConfig struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"` ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
Site string `json:"site" bson:"site"` Site string `json:"site" bson:"site"`
Seller Seller `json:"seller" bson:"seller"` // 销售平台 Seller types.Seller `json:"seller" bson:"seller"` // 销售平台
Enable bool `json:"enable" bson:"enable"` // 是否启用 Enable bool `json:"enable" bson:"enable"` // 是否启用
embed.RoutePriceConfig `bson:",inline"` // 路线价格配置 RoutePriceConfig `bson:",inline"` // 路线价格配置
UpdateAt time.Time `bson:"updateAt,omitempty" json:"updateAt,omitempty"` UpdateAt time.Time `bson:"updateAt,omitempty" json:"updateAt,omitempty"`
CreateAt time.Time `bson:"createAt,omitempty" json:"createAt,omitempty"` CreateAt time.Time `bson:"createAt,omitempty" json:"createAt,omitempty"`
} }
type Seller struct { type RoutePriceConfig struct {
Platform platform.Platform `json:"platform" bson:"platform"` // 销售平台 Route string `json:"route" bson:"route"` // 路线
Store store.Store `json:"store" bson:"store"` // 销售平台具体店铺 Currency currency.Currency `json:"currency" bson:"currency"` // 结算币种
TaxRate float64 `json:"taxRate" bson:"taxRate"` // 税率
MarginConfig MarginConfig `json:"marginConfig" bson:"marginConfig"` // 加价率配置
FreightConfig FreightConfig `json:"freightConfig" bson:"freightConfig"` // 商家运费配置,默认是 0
ExpressConfig ExpressConfig `json:"expressConfig" bson:"expressConfig"` // 跨境快递配置,默认是0
}
type ExpressConfig struct {
Express float64 `json:"express" bson:"express"` // 跨境快递费
ExpressStyle ExpressStyle `json:"expressStyle" bson:"expressStyle"` // 跨境快递费计算方式
}
type ExpressStyle string
type FreightConfig struct {
Freight float64 `json:"freight" bson:"freight"` // 商家运费
FreightStyle FreightStyle `json:"freightStyle" bson:"freightStyle" ` // 商家运费计算方式
OverFreight int `json:"overFreight" bson:"overFreight"` // 计算方式是满免的时候会用到
OverFreightCurrency currency.Currency `json:"overFreightCurrency" bson:"overFreightCurrency"` // 计算方式是满免的时候会用到
}
type FreightStyle string
type MarginConfig struct {
MarginStyle MarginStyle `json:"marginStyle" bson:"marginStyle"` // 加价方式
Margin float64 `json:"margin" bson:"margin"` // 加价率,加价方式为固定加价时使用
Ladders []Ladder `json:"ladders" bson:"ladders"` // 阶梯加价,加价方式为阶梯加价时使用
}
type MarginStyle string
type Ladder struct {
Min float64 `json:"min" bson:"min"` // 最小值
Max float64 `json:"max" bson:"max"` // 最大值
Margin float64 `json:"margin" bson:"margin"` // 加价率
LastOne bool `bson:"-"`
} }
...@@ -14,12 +14,12 @@ type {{.lowerType}}Model interface{ ...@@ -14,12 +14,12 @@ type {{.lowerType}}Model interface{
type default{{.Type}}Model struct { type default{{.Type}}Model struct {
conn {{if .Cache}}*monc.Model{{else}}*mon.Model{{end}} conn {{if .Cache}}*monc.Model{{else}}*mon.Model{{end}}
model.Model[{{.Type}}] model.Base[{{.Type}}]
} }
func newDefault{{.Type}}Model(conn {{if .Cache}}*monc.Model{{else}}*mon.Model{{end}}) *default{{.Type}}Model { func newDefault{{.Type}}Model(conn {{if .Cache}}*monc.Model{{else}}*mon.Model{{end}}) *default{{.Type}}Model {
return &default{{.Type}}Model{ return &default{{.Type}}Model{
conn: conn, conn: conn,
Model: model.NewBase[{{.Type}}](conn), Base: model.NewBase[{{.Type}}](conn),
} }
} }
\ No newline at end of file
package types
import (
"git.chillcy.com/golang/colorway/internal/pkg/platform"
"git.chillcy.com/golang/colorway/internal/pkg/store"
)
type Seller struct {
Platform platform.Platform `json:"platform" bson:"platform"` // 销售平台
Store store.Store `json:"store" bson:"store"` // 销售平台具体店铺
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment