From ddecfdea12a2172250ac82901678b171864149c5 Mon Sep 17 00:00:00 2001 From: flswld Date: Sun, 12 Feb 2023 23:47:44 +0800 Subject: [PATCH] =?UTF-8?q?1.=E7=A6=BB=E7=BA=BF=E7=8E=A9=E5=AE=B6=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=8A=A0=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81=E6=93=8D?= =?UTF-8?q?=E4=BD=9C=202.=E4=BF=AE=E5=A4=8D=E8=81=8A=E5=A4=A9=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E9=94=99=E4=B9=B1=203.=E4=BF=AE=E5=A4=8D=E9=87=8D?= =?UTF-8?q?=E5=90=AF=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=90=8E=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E7=99=BB=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/mq/nats.go | 1 + gate/net/kcp_connect_manager.go | 4 +-- gate/net/session.go | 8 ++++- gdconf/game_data_config.go | 2 +- gdconf/scene_detail.go | 2 +- gs/dao/player_mongo.go | 21 ++--------- gs/dao/player_redis.go | 63 +++++++++++++++++++++++++++++++++ gs/game/game_manager.go | 10 +++++- gs/game/player_social.go | 4 +-- gs/game/user_manager.go | 24 +++++++++---- gs/game/world_manager.go | 4 +-- 11 files changed, 108 insertions(+), 35 deletions(-) diff --git a/common/mq/nats.go b/common/mq/nats.go index 95728fd6..b443c05a 100644 --- a/common/mq/nats.go +++ b/common/mq/nats.go @@ -341,6 +341,7 @@ func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) { _, exist := gateServerConnAddrMap[gateServerAddr] // GATE连接已存在 if exist { + logger.Info("gate tcp mq conn already exist addr: %v", gateServerAddr) continue } addr, err := net.ResolveTCPAddr("tcp4", gateServerAddr) diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index d3d70281..527be454 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -23,12 +23,12 @@ import ( const ( ConnSynPacketFreqLimit = 100 // 连接建立握手包每秒发包频率限制 - RecvPacketFreqLimit = 100 // 客户端上行每秒发包频率限制 + RecvPacketFreqLimit = 200 // 客户端上行每秒发包频率限制 SendPacketFreqLimit = 1000 // 服务器下行每秒发包频率限制 PacketMaxLen = 343 * 1024 // 最大应用层包长度 ConnRecvTimeout = 30 // 收包超时时间 秒 ConnSendTimeout = 10 // 发包超时时间 秒 - MaxClientConnNumLimit = 2 // 最大客户端连接数限制 + MaxClientConnNumLimit = 1000 // 最大客户端连接数限制 ) var CLIENT_CONN_NUM int32 = 0 // 当前客户端连接数 diff --git a/gate/net/session.go b/gate/net/session.go index 43274293..c2e39fef 100644 --- a/gate/net/session.go +++ b/gate/net/session.go @@ -295,7 +295,9 @@ func (k *KcpConnectManager) sendMsgHandle() { GameMsg: gameMsg, }) case mq.ServerUserOnlineStateChangeNotify: - // 收到GS玩家离线完成通知 唤醒存在的顶号登录流程 + // 收到GS玩家离线完成通知 + logger.Debug("global player online state change, uid: %v, online: %v, gs appid: %v", + serverMsg.UserId, serverMsg.IsOnline, netMsg.OriginServerAppId) if serverMsg.IsOnline { k.globalGsOnlineMapLock.Lock() k.globalGsOnlineMap[serverMsg.UserId] = netMsg.OriginServerAppId @@ -308,6 +310,8 @@ func (k *KcpConnectManager) sendMsgHandle() { if !exist { continue } + // 唤醒存在的顶号登录流程 + logger.Info("awake interrupt login, uid: %v", serverMsg.UserId) kickFinishNotifyChan <- true } } @@ -400,6 +404,7 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session userId: uid, kickFinishNotifyChan: kickFinishNotifyChan, } + logger.Info("run local interrupt login wait, uid: %v", uid) <-kickFinishNotifyChan } k.globalGsOnlineMapLock.RLock() @@ -421,6 +426,7 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session userId: uid, kickFinishNotifyChan: kickFinishNotifyChan, } + logger.Info("run global interrupt login wait, uid: %v", uid) <-kickFinishNotifyChan } // 关联玩家uid和连接信息 diff --git a/gdconf/game_data_config.go b/gdconf/game_data_config.go index 3d3b8331..2aa63fbd 100644 --- a/gdconf/game_data_config.go +++ b/gdconf/game_data_config.go @@ -114,7 +114,7 @@ func (g *GameDataConfig) load() { g.loadSceneData() // 场景 g.loadScenePoint() // 场景传送点 g.loadSceneTagData() // 场景地图图标 - g.loadScene() // 场景详情 + g.loadSceneDetail() // 场景详情LUA配置数据 g.loadWorldAreaData() // 世界区域 g.loadGatherData() // 采集物 g.loadFetterData() // 角色资料解锁 diff --git a/gdconf/scene_detail.go b/gdconf/scene_detail.go index a8c949ef..08926a07 100644 --- a/gdconf/scene_detail.go +++ b/gdconf/scene_detail.go @@ -131,7 +131,7 @@ func (g *GameDataConfig) loadGroup(group *Group, block *Block, sceneId int32, bl block.groupMapLoadLock.Unlock() } -func (g *GameDataConfig) loadScene() { +func (g *GameDataConfig) loadSceneDetail() { g.SceneDetailMap = make(map[int32]*SceneDetail) sceneLuaPrefix := g.luaPrefix + "scene/" for _, sceneData := range g.SceneDataMap { diff --git a/gs/dao/player_mongo.go b/gs/dao/player_mongo.go index 14b86cc4..e5ac0e7b 100644 --- a/gs/dao/player_mongo.go +++ b/gs/dao/player_mongo.go @@ -249,22 +249,7 @@ func (d *Dao) QueryChatMsgListByUid(uid uint32) ([]*model.ChatMsg, error) { result := make([]*model.ChatMsg, 0) find, err := db.Find( context.TODO(), - bson.D{{"ToUid", uid}}, - ) - if err != nil { - return nil, err - } - for find.Next(context.TODO()) { - item := new(model.ChatMsg) - err = find.Decode(item) - if err != nil { - return nil, err - } - result = append(result, item) - } - find, err = db.Find( - context.TODO(), - bson.D{{"Uid", uid}}, + bson.D{{"$or", []bson.D{{{"ToUid", uid}}, {{"Uid", uid}}}}}, ) if err != nil { return nil, err @@ -282,7 +267,7 @@ func (d *Dao) QueryChatMsgListByUid(uid uint32) ([]*model.ChatMsg, error) { func (d *Dao) ReadAndUpdateChatMsgByUid(uid uint32, targetUid uint32) error { db := d.db.Collection("chat_msg") - _, err := db.UpdateOne( + _, err := db.UpdateMany( context.TODO(), bson.D{{"ToUid", uid}, {"Uid", targetUid}}, bson.D{{"$set", bson.D{{"IsRead", true}}}}, @@ -290,7 +275,7 @@ func (d *Dao) ReadAndUpdateChatMsgByUid(uid uint32, targetUid uint32) error { if err != nil { return err } - _, err = db.UpdateOne( + _, err = db.UpdateMany( context.TODO(), bson.D{{"Uid", uid}, {"ToUid", targetUid}}, bson.D{{"$set", bson.D{{"IsRead", true}}}}, diff --git a/gs/dao/player_redis.go b/gs/dao/player_redis.go index 194c8fef..1fa71a6d 100644 --- a/gs/dao/player_redis.go +++ b/gs/dao/player_redis.go @@ -14,12 +14,20 @@ import ( "github.com/vmihailenco/msgpack/v5" ) +// RedisPlayerKeyPrefix key前缀 const RedisPlayerKeyPrefix = "HK4E" +// GetRedisPlayerKey 获取玩家数据key func (d *Dao) GetRedisPlayerKey(userId uint32) string { return RedisPlayerKeyPrefix + ":USER:" + strconv.Itoa(int(userId)) } +// GetRedisPlayerLockKey 获取玩家分布式锁key +func (d *Dao) GetRedisPlayerLockKey(userId uint32) string { + return RedisPlayerKeyPrefix + ":USER_LOCK:" + strconv.Itoa(int(userId)) +} + +// GetRedisPlayer 获取玩家数据 func (d *Dao) GetRedisPlayer(userId uint32) *model.Player { startTime := time.Now().UnixNano() playerDataLz4, err := d.redis.Get(context.TODO(), d.GetRedisPlayerKey(userId)).Result() @@ -54,6 +62,7 @@ func (d *Dao) GetRedisPlayer(userId uint32) *model.Player { return player } +// SetRedisPlayer 写入玩家数据 func (d *Dao) SetRedisPlayer(player *model.Player) { playerData, err := msgpack.Marshal(player) if err != nil { @@ -91,9 +100,63 @@ func (d *Dao) SetRedisPlayer(player *model.Player) { logger.Debug("set player to redis cost time: %v ns", costTime) } +// SetRedisPlayerList 批量写入玩家数据 func (d *Dao) SetRedisPlayerList(playerList []*model.Player) { // TODO 换成redis批量命令执行 for _, player := range playerList { d.SetRedisPlayer(player) } } + +// 基于redis的玩家离线数据分布式锁实现 + +const ( + MaxLockAliveTime = 10000 // 单个锁的最大存活时间 毫秒 + LockRetryWaitTime = 50 // 同步加锁重试间隔时间 毫秒 + MaxLockRetryTimes = 2 // 同步加锁最大重试次数 +) + +// DistLock 加锁并返回是否成功 +func (d *Dao) DistLock(userId uint32) bool { + result, err := d.redis.SetNX(context.TODO(), + d.GetRedisPlayerLockKey(userId), + time.Now().UnixMilli(), + time.Millisecond*time.Duration(MaxLockAliveTime)).Result() + if err != nil { + logger.Error("redis lock setnx error: %v", err) + return false + } + return result +} + +// DistLockSync 加锁同步阻塞直到成功或超时 +func (d *Dao) DistLockSync(userId uint32) bool { + for i := 0; i < MaxLockRetryTimes; i++ { + result, err := d.redis.SetNX(context.TODO(), + d.GetRedisPlayerLockKey(userId), + time.Now().UnixMilli(), + time.Millisecond*time.Duration(MaxLockAliveTime)).Result() + if err != nil { + logger.Error("redis lock setnx error: %v", err) + return false + } + if result == true { + break + } + time.Sleep(time.Millisecond * time.Duration(LockRetryWaitTime)) + } + return true +} + +// DistUnlock 解锁 +func (d *Dao) DistUnlock(userId uint32) { + result, err := d.redis.Del(context.TODO(), d.GetRedisPlayerLockKey(userId)).Result() + if err != nil { + logger.Error("redis lock del error: %v", err) + return + } + if result == 0 { + logger.Error("redis lock del result is fail") + return + } +} diff --git a/gs/game/game_manager.go b/gs/game/game_manager.go index c58254db..3c763b42 100644 --- a/gs/game/game_manager.go +++ b/gs/game/game_manager.go @@ -263,10 +263,18 @@ func (g *GameManager) Close() { Msg: saveUserIdList, } <-EXIT_SAVE_FIN_CHAN - // 单纯的告诉网关下线玩家 + // 告诉网关下线玩家并全服广播玩家离线 userList := USER_MANAGER.GetAllOnlineUserList() for _, player := range userList { g.KickPlayer(player.PlayerID, kcp.EnetServerShutdown) + MESSAGE_QUEUE.SendToAll(&mq.NetMsg{ + MsgType: mq.MsgTypeServer, + EventId: mq.ServerUserOnlineStateChangeNotify, + ServerMsg: &mq.ServerMsg{ + UserId: player.PlayerID, + IsOnline: false, + }, + }) } time.Sleep(time.Second) } diff --git a/gs/game/player_social.go b/gs/game/player_social.go index 30e57cc8..53251b61 100644 --- a/gs/game/player_social.go +++ b/gs/game/player_social.go @@ -269,7 +269,7 @@ func (g *GameManager) AskAddFriendReq(player *model.Player, payloadMsg pb.Messag }) } else { // 全服离线玩家 - targetPlayer, _, _ := USER_MANAGER.LoadGlobalPlayer(targetUid) + targetPlayer := USER_MANAGER.LoadTempOfflineUser(targetUid, true) if targetPlayer == nil { logger.Error("apply add friend target player is nil, uid: %v", targetUid) return @@ -362,7 +362,7 @@ func (g *GameManager) DealAddFriendReq(player *model.Player, payloadMsg pb.Messa }) } else { // 全服离线玩家 - targetPlayer, _, _ := USER_MANAGER.LoadGlobalPlayer(targetUid) + targetPlayer := USER_MANAGER.LoadTempOfflineUser(targetUid, true) if targetPlayer == nil { logger.Error("apply add friend target player is nil, uid: %v", targetUid) return diff --git a/gs/game/user_manager.go b/gs/game/user_manager.go index 8cfeea57..01eb19ec 100644 --- a/gs/game/user_manager.go +++ b/gs/game/user_manager.go @@ -268,7 +268,7 @@ func (u *UserManager) SetRemoteUserOnlineState(userId uint32, isOnline bool, app } } -// GetRemoteOnlineUserList 获取指定数量的远程在线玩家 +// GetRemoteOnlineUserList 获取指定数量的远程在线玩家 玩家数据只读禁止修改 func (u *UserManager) GetRemoteOnlineUserList(total int) map[uint32]*model.Player { if total > 50 { return nil @@ -276,7 +276,7 @@ func (u *UserManager) GetRemoteOnlineUserList(total int) map[uint32]*model.Playe onlinePlayerMap := make(map[uint32]*model.Player) count := 0 for userId := range u.remotePlayerMap { - player := u.LoadTempOfflineUser(userId) + player := u.LoadTempOfflineUser(userId, false) if player == nil { continue } @@ -289,7 +289,7 @@ func (u *UserManager) GetRemoteOnlineUserList(total int) map[uint32]*model.Playe return onlinePlayerMap } -// LoadGlobalPlayer 加载并返回一个全服玩家及其在线状态 +// LoadGlobalPlayer 加载并返回一个全服玩家及其在线状态 玩家数据只读禁止修改 // 参见LoadTempOfflineUser说明 func (u *UserManager) LoadGlobalPlayer(userId uint32) (player *model.Player, online bool, remote bool) { online = u.GetUserOnlineState(userId) @@ -304,14 +304,14 @@ func (u *UserManager) LoadGlobalPlayer(userId uint32) (player *model.Player, onl if online { if remote { // 远程在线玩家 为了简化实现流程 直接加载数据库临时档 - player = u.LoadTempOfflineUser(userId) + player = u.LoadTempOfflineUser(userId, false) } else { // 本地在线玩家 player = u.GetOnlineUser(userId) } } else { // 全服离线玩家 - player = u.LoadTempOfflineUser(userId) + player = u.LoadTempOfflineUser(userId, false) } return player, online, remote } @@ -320,12 +320,20 @@ func (u *UserManager) LoadGlobalPlayer(userId uint32) (player *model.Player, onl // LoadTempOfflineUser 加载临时离线玩家 // 正常情况速度较快可以同步阻塞调用 -func (u *UserManager) LoadTempOfflineUser(userId uint32) *model.Player { +func (u *UserManager) LoadTempOfflineUser(userId uint32, lock bool) *model.Player { player := u.GetOnlineUser(userId) if player != nil && player.Online { logger.Error("not allow get a online player as offline player, uid: %v", userId) return nil } + if lock { + // 加离线玩家数据分布式锁 + ok := u.dao.DistLockSync(userId) + if !ok { + logger.Error("lock redis offline player data error, uid: %v", userId) + return nil + } + } player = u.LoadUserFromRedisSync(userId) if player == nil { // 玩家可能不存在于redis 尝试从db查询出来然后写入redis @@ -349,10 +357,12 @@ func (u *UserManager) LoadTempOfflineUser(userId uint32) *model.Player { } // SaveTempOfflineUser 保存临时离线玩家 -// 如果在调用LoadTempOfflineUser后修改了离线玩家数据 则必须立即调用此函数回写 +// 如果调用LoadTempOfflineUser获取了离线玩家数据 则必须在逻辑完成后立即调用此函数回写并解锁 func (u *UserManager) SaveTempOfflineUser(player *model.Player) { // 主协程同步写入redis u.SaveUserToRedisSync(player) + // 解离线玩家数据分布式锁 + u.dao.DistUnlock(player.PlayerID) // 另一个协程异步的写回db playerData, err := msgpack.Marshal(player) if err != nil { diff --git a/gs/game/world_manager.go b/gs/game/world_manager.go index 46ef4704..d5e0b68f 100644 --- a/gs/game/world_manager.go +++ b/gs/game/world_manager.go @@ -15,8 +15,8 @@ import ( const ( ENTITY_NUM_UNLIMIT = false // 是否不限制场景内实体数量 - ENTITY_MAX_SEND_NUM = 200 // 场景内最大实体数量 - MAX_MULTIPLAYER_WORLD_NUM = 2 // 本服务器最大多人世界数量 + ENTITY_MAX_SEND_NUM = 300 // 场景内最大实体数量 + MAX_MULTIPLAYER_WORLD_NUM = 10 // 本服务器最大多人世界数量 ) type WorldManager struct {