Files
hk4e/gs/game/game_local_event_manager.go
2023-04-04 18:27:08 +08:00

208 lines
6.2 KiB
Go

package game
import (
"sort"
"sync"
"time"
"hk4e/common/mq"
"hk4e/gdconf"
"hk4e/gs/model"
"hk4e/pkg/logger"
"github.com/vmihailenco/msgpack/v5"
)
// 本地事件队列管理器
const (
LoadLoginUserFromDbFinish = iota // 玩家登录从数据库加载完成回调
RunUserCopyAndSave // 执行一次在线玩家内存数据复制到数据库写入协程
ExitRunUserCopyAndSave // 停服时执行全部玩家保存操作
UserOfflineSaveToDbFinish // 玩家离线保存完成
ReloadGameDataConfig // 执行热更表
ReloadGameDataConfigFinish // 热更表完成
)
const (
UserCopyGoroutineLimit = 4
)
type LocalEvent struct {
EventId int
Msg any
}
type LocalEventManager struct {
localEventChan chan *LocalEvent
}
func NewLocalEventManager() (r *LocalEventManager) {
r = new(LocalEventManager)
r.localEventChan = make(chan *LocalEvent, 1000)
return r
}
func (l *LocalEventManager) GetLocalEventChan() chan *LocalEvent {
return l.localEventChan
}
type PlayerLastSaveTimeSortList []*model.Player
func (p PlayerLastSaveTimeSortList) Len() int {
return len(p)
}
func (p PlayerLastSaveTimeSortList) Less(i, j int) bool {
return p[i].LastSaveTime < p[j].LastSaveTime
}
func (p PlayerLastSaveTimeSortList) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (l *LocalEventManager) LocalEventHandle(localEvent *LocalEvent) {
switch localEvent.EventId {
case LoadLoginUserFromDbFinish:
playerLoginInfo := localEvent.Msg.(*PlayerLoginInfo)
GAME.OnLogin(playerLoginInfo.UserId, playerLoginInfo.ClientSeq, playerLoginInfo.GateAppId, playerLoginInfo.Player, playerLoginInfo.JoinHostUserId)
case ExitRunUserCopyAndSave:
fallthrough
case RunUserCopyAndSave:
startTime := time.Now().UnixNano()
playerList := make(PlayerLastSaveTimeSortList, 0)
for _, player := range USER_MANAGER.GetAllOnlineUserList() {
if player.PlayerID < PlayerBaseUid {
continue
}
playerList = append(playerList, player)
}
sort.Stable(playerList)
// 拷贝一份数据避免并发访问
insertPlayerList := make([][]byte, 0)
updatePlayerList := make([][]byte, 0)
saveCount := 0
times := len(playerList) / UserCopyGoroutineLimit
if times == 0 && len(playerList) > 0 {
times = 1
}
for index := 0; index < times; index++ {
totalCostTime := time.Now().UnixNano() - startTime
if totalCostTime > time.Millisecond.Nanoseconds()*10 {
// 总耗时超过10ms就中止本轮保存
logger.Info("user copy loop overtime exit, total cost time: %v ns", totalCostTime)
break
}
// 分批次并发序列化玩家数据
oncePlayerListEndIndex := 0
if index < times-1 {
oncePlayerListEndIndex = (index + 1) * UserCopyGoroutineLimit
} else {
oncePlayerListEndIndex = len(playerList)
}
oncePlayerList := playerList[index*UserCopyGoroutineLimit : oncePlayerListEndIndex]
var playerDataMapLock sync.Mutex
playerDataMap := make(map[uint32][]byte)
var wg sync.WaitGroup
for _, player := range oncePlayerList {
wg.Add(1)
go func(player *model.Player) {
defer func() {
wg.Done()
}()
playerData, err := msgpack.Marshal(player)
if err != nil {
logger.Error("marshal player data error: %v", err)
return
}
playerDataMapLock.Lock()
playerDataMap[player.PlayerID] = playerData
playerDataMapLock.Unlock()
}(player)
}
wg.Wait()
for _, player := range oncePlayerList {
playerData, exist := playerDataMap[player.PlayerID]
if !exist {
continue
}
switch player.DbState {
case model.DbNone:
break
case model.DbInsert:
insertPlayerList = append(insertPlayerList, playerData)
player.DbState = model.DbNormal
player.LastSaveTime = uint32(time.Now().UnixMilli())
saveCount++
case model.DbDelete:
USER_MANAGER.DeleteUser(player.PlayerID)
case model.DbNormal:
updatePlayerList = append(updatePlayerList, playerData)
player.LastSaveTime = uint32(time.Now().UnixMilli())
saveCount++
}
}
}
saveUserData := &SaveUserData{
insertPlayerList: insertPlayerList,
updatePlayerList: updatePlayerList,
exitSave: false,
}
if localEvent.EventId == ExitRunUserCopyAndSave {
saveUserData.exitSave = true
}
USER_MANAGER.GetSaveUserChan() <- saveUserData
endTime := time.Now().UnixNano()
costTime := endTime - startTime
logger.Info("run save user copy cost time: %v ns, save user count: %v", costTime, saveCount)
if localEvent.EventId == ExitRunUserCopyAndSave {
// 在此阻塞掉主协程 不再进行任何消息和任务的处理
select {}
}
case UserOfflineSaveToDbFinish:
playerOfflineInfo := localEvent.Msg.(*PlayerOfflineInfo)
USER_MANAGER.DeleteUser(playerOfflineInfo.Player.PlayerID)
MESSAGE_QUEUE.SendToAll(&mq.NetMsg{
MsgType: mq.MsgTypeServer,
EventId: mq.ServerUserOnlineStateChangeNotify,
ServerMsg: &mq.ServerMsg{
UserId: playerOfflineInfo.Player.PlayerID,
IsOnline: false,
},
})
if playerOfflineInfo.ChangeGsInfo.IsChangeGs {
gsAppId := USER_MANAGER.GetRemoteUserGsAppId(playerOfflineInfo.ChangeGsInfo.JoinHostUserId)
MESSAGE_QUEUE.SendToGate(playerOfflineInfo.Player.GateAppId, &mq.NetMsg{
MsgType: mq.MsgTypeServer,
EventId: mq.ServerUserGsChangeNotify,
ServerMsg: &mq.ServerMsg{
UserId: playerOfflineInfo.Player.PlayerID,
GameServerAppId: gsAppId,
JoinHostUserId: playerOfflineInfo.ChangeGsInfo.JoinHostUserId,
},
})
logger.Info("user change gs notify to gate, uid: %v, gate appid: %v, gs appid: %v, host uid: %v",
playerOfflineInfo.Player.PlayerID, playerOfflineInfo.Player.GateAppId, gsAppId, playerOfflineInfo.ChangeGsInfo.JoinHostUserId)
}
case ReloadGameDataConfig:
go func() {
defer func() {
if err := recover(); err != nil {
logger.Error("reload game data config error: %v", err)
}
}()
gdconf.ReloadGameDataConfig()
LOCAL_EVENT_MANAGER.localEventChan <- &LocalEvent{
EventId: ReloadGameDataConfigFinish,
}
}()
case ReloadGameDataConfigFinish:
gdconf.ReplaceGameDataConfig()
startTime := time.Now().UnixNano()
WORLD_MANAGER.LoadSceneBlockAoiMap()
endTime := time.Now().UnixNano()
costTime := endTime - startTime
logger.Info("run [LoadSceneBlockAoiMap], cost time: %v ns", costTime)
}
}