Golang微服務(wù)(三)
分布式鎖
gRPC service層demo router handler如下:
/*
>>>model
type Inventory struct {
BaseModel
Goods int32 `gorm:"type:int;index"`
Stocks int32 `gorm:"type:int"`
Version int32 `gorm:"type:int"`
}
>>>proto源
syntax = "proto3";
import "google/protobuf/empty.proto";
option go_package = ".;proto";
service Inventory {
...
rpc Sell(SellInfo) returns (google.protobuf.Empty);
...
}
message GoodsInv {
int32 goodsID = 1;
int32 num = 2;
}
message SellInfo {
repeated GoodsInv goodsInfo = 1;
}
>>>proto生成的結(jié)構(gòu)
type SellInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
GoodsInfo []*GoodsInv `protobuf:"bytes,1,rep,name=goodsInfo,proto3" json:"goodsInfo,omitempty"`
}
type GoodsInv struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
GoodsID int32 `protobuf:"varint,1,opt,name=goodsID,proto3" json:"goodsID,omitempty"`
Num int32 `protobuf:"varint,2,opt,name=num,proto3" json:"num,omitempty"`
}
*/
func (is *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
tx := global.DB.Begin()
for _, good := range req.GoodsInfo {
var inv model.Inventory
if result := global.DB.Where("goods=?", good.GoodsID).First(&inv); result.RowsAffected < 1 {
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("參數(shù)錯(cuò)誤商品:%d不存在", good.GoodsID))
}
if inv.Stocks < good.Num {
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("商品:%d庫(kù)存不足", good.GoodsID))
}
inv.Stocks -= good.Num
tx.Save(&inv)
}
tx.Commit()
return &emptypb.Empty{}, nil
}
互斥鎖
當(dāng)并發(fā)請(qǐng)求Sell時(shí),可能會(huì)出現(xiàn)庫(kù)存扣減總量與請(qǐng)求扣減總量不一致的情況,這種并發(fā)請(qǐng)求帶來(lái)的問(wèn)題無(wú)法通過(guò)數(shù)據(jù)庫(kù)事務(wù)來(lái)解決,而需要靠鎖,將讀寫過(guò)程串行:
var m sync.Mutex
func (is *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
tx := global.DB.Begin()
m.Lock()
...
m.Lock()
return &emptypb.Empty{}, nil
}
但這樣有個(gè)問(wèn)題:此接口在按照訂單處理預(yù)扣庫(kù)存,即多種商品讀寫在沒(méi)必要得全部串行(只將同商品的數(shù)據(jù)修改串行即可確保數(shù)據(jù)安全)
悲觀鎖
mysql的for update語(yǔ)句會(huì)有一些特性:在索引列for update會(huì)給滿足條件的記錄做行鎖,在非索引列or update時(shí)會(huì)升級(jí)為表鎖,但是只針對(duì)更新語(yǔ)句,如果沒(méi)有符合條件的語(yǔ)句,則不會(huì)鎖表。commit后釋放鎖。
所以可以修改為如下悲觀鎖:
func (is *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
tx := global.DB.Begin()
for _, good := range req.GoodsInfo {
var inv model.Inventory
// 只需在此處使用gorm的Clauses
if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("goods=?", good.GoodsID).First(&inv); result.RowsAffected < 1 {
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("參數(shù)錯(cuò)誤商品:%d不存在", good.GoodsID))
}
if inv.Stocks < good.Num {
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("商品:%d庫(kù)存不足", good.GoodsID))
}
inv.Stocks -= good.Num
tx.Save(&inv)
}
tx.Commit()
return &emptypb.Empty{}, nil
}
悲觀鎖并不完全反對(duì)并發(fā),很多情況下只是行鎖,對(duì)于正常的select語(yǔ)句也不會(huì)造成影響。但悲觀鎖的性能確實(shí)不盡如人意。
樂(lè)觀鎖
樂(lè)觀鎖本質(zhì)上是保證數(shù)據(jù)一致性的一種解決方案,優(yōu)點(diǎn)是在沒(méi)有讓數(shù)據(jù)庫(kù)加鎖的前提下避免了數(shù)據(jù)不一致的問(wèn)題:通過(guò)在記錄中增加版本號(hào)字段,在并發(fā)讀取了同一條原紀(jì)錄且嘗試同時(shí)將新的數(shù)據(jù)保存至原紀(jì)錄時(shí)確保只有一次保存成功并保證其他保存動(dòng)作全部失敗。
將以上代碼修改為樂(lè)觀鎖:
func (is *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
tx := global.DB.Begin()
for _, good := range req.GoodsInfo {
var inv model.Inventory
for {
if result := global.DB.Where("goods=?", good.GoodsID).First(&inv); result.RowsAffected < 1 {
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("參數(shù)錯(cuò)誤商品:%d不存在", good.GoodsID))
}
if inv.Stocks < good.Num {
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("商品:%d庫(kù)存不足", good.GoodsID))
}
if result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? AND version = ?", inv.Goods, inv.Version).Updates(model.Inventory{
Stocks: inv.Stocks - good.Num,
Version: inv.Version + 1,
}); result.RowsAffected > 0 {
break
}
}
}
tx.Commit()
return &emptypb.Empty{}, nil
}
基于Redis的分布式鎖
通過(guò)對(duì)redis指定key的查詢,不同服務(wù)可以共享同一把鎖,另外redis可以提供包括setnx在內(nèi)的一些命令來(lái)實(shí)現(xiàn)指定key的get&set的原子操作,用來(lái)完成鎖的查詢、獲取、釋放等。
保證互斥性:原子操作
防死鎖常用操作邏輯鏈:防死鎖-->設(shè)置超時(shí)-->防止超時(shí)影響正常業(yè)務(wù)邏輯完整執(zhí)行-->設(shè)置延時(shí)-->防止某種服務(wù)卡住導(dǎo)致無(wú)限申請(qǐng)延時(shí)
安全性:value值與goroutine綁定(genValueFunc: genValue),只有持有鎖的goroutine可以刪除key-value
集群?jiǎn)栴}:redlock,m.actOnPoolsAsync,不分主從的redis集群,通過(guò)獲取過(guò)半redis實(shí)例的鎖來(lái)確定當(dāng)前goroutine在所有redis實(shí)例上的持有,未能獲取過(guò)半實(shí)例的goroutine釋放已經(jīng)獲取的實(shí)例上的鎖并進(jìn)入輪詢拿鎖。
相對(duì)于樂(lè)觀鎖,分布式鎖工作的條件更苛刻。
func (is *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
rs := redsync.New(global.Pool)
tx := global.DB.Begin()
for _, good := range req.GoodsInfo {
var inv model.Inventory
mutex := rs.NewMutex(fmt.Sprintf("goods_%d", good.GoodsID), redsync.WithExpiry(6*time.Second))
if err := mutex.Lock(); err != nil {
return nil, status.Errorf(codes.Internal, "獲取分布式鎖異常")
}
if result := global.DB.Where(&model.Inventory{Goods: good.GoodsID}).First(&inv); result.RowsAffected < 1 {
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("參數(shù)錯(cuò)誤商品:%d不存在", good.GoodsID))
}
if inv.Stocks < good.Num {
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("商品:%d庫(kù)存不足", good.GoodsID))
}
inv.Stocks -= good.Num
tx.Save(&inv)
if ok, err := mutex.Unlock(); !ok || err != nil {
return nil, status.Errorf(codes.Internal, "釋放分布式鎖異常")
}
}
tx.Commit()
return &emptypb.Empty{}, nil
}
浙公網(wǎng)安備 33010602011771號(hào)