服务器玩家在线信息同步

This commit is contained in:
flswld
2023-03-17 14:30:49 +08:00
parent 658b577c20
commit 7de1d2e765
19 changed files with 286 additions and 77 deletions

View File

@@ -8,6 +8,7 @@ import (
"syscall"
"time"
"hk4e/anticheat/handle"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/common/rpc"
@@ -63,6 +64,8 @@ func Run(ctx context.Context, configFile string) error {
messageQueue := mq.NewMessageQueue(api.ANTICHEAT, APPID, client)
defer messageQueue.Close()
handle.NewHandle(messageQueue)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {

View File

@@ -0,0 +1,71 @@
package handle
import (
"hk4e/common/mq"
"hk4e/gate/kcp"
"hk4e/node/api"
"hk4e/protocol/cmd"
"hk4e/protocol/proto"
pb "google.golang.org/protobuf/proto"
)
type Handle struct {
messageQueue *mq.MessageQueue
}
func NewHandle(messageQueue *mq.MessageQueue) (r *Handle) {
r = new(Handle)
r.messageQueue = messageQueue
return r
}
func (h *Handle) run() {
go func() {
for {
netMsg := <-h.messageQueue.GetNetMsg()
if netMsg.MsgType != mq.MsgTypeGame {
continue
}
if netMsg.EventId != mq.NormalMsg {
continue
}
if netMsg.OriginServerType != api.GATE {
continue
}
gameMsg := netMsg.GameMsg
switch gameMsg.CmdId {
case cmd.CombatInvocationsNotify:
h.CombatInvocationsNotify(gameMsg.UserId, netMsg.OriginServerAppId, gameMsg.PayloadMessage)
}
}
}()
}
func (h *Handle) CombatInvocationsNotify(userId uint32, gateAppId string, payloadMsg pb.Message) {
req := payloadMsg.(*proto.CombatInvocationsNotify)
for _, entry := range req.InvokeList {
switch entry.ArgumentType {
case proto.CombatTypeArgument_ENTITY_MOVE:
entityMoveInfo := new(proto.EntityMoveInfo)
err := pb.Unmarshal(entry.CombatData, entityMoveInfo)
if err != nil {
continue
}
if entityMoveInfo.MotionInfo.Pos.Y > 3000.0 {
h.KickPlayer(userId, gateAppId)
}
}
}
}
func (h *Handle) KickPlayer(userId uint32, gateAppId string) {
h.messageQueue.SendToGate(gateAppId, &mq.NetMsg{
MsgType: mq.MsgTypeConnCtrl,
EventId: mq.KickPlayerNotify,
ConnCtrlMsg: &mq.ConnCtrlMsg{
KickUserId: userId,
KickReason: kcp.EnetServerKillClient,
},
})
}

View File

@@ -69,7 +69,7 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r
r.rpcClient = rpcClient
if serverType == api.GATE {
go r.runGateTcpMqServer()
} else {
} else if serverType == api.GS || serverType == api.ANTICHEAT || serverType == api.PATHFINDING {
go r.runGateTcpMqClient()
}
go r.recvHandler()

View File

@@ -111,6 +111,8 @@ func (k *KcpConnectManager) run() {
go k.sendMsgHandle()
go k.acceptHandle(listener)
go k.gateNetInfo()
k.syncGlobalGsOnlineMap()
go k.autoSyncGlobalGsOnlineMap()
}
func (k *KcpConnectManager) Close() {
@@ -268,7 +270,7 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
}
}
// Session 连接会话结构
// Session 连接会话结构 只允许定义并发安全或者简单的基础数据结构
type Session struct {
conn *kcp.UDPSession
connState uint8
@@ -467,3 +469,28 @@ func (k *KcpConnectManager) DeleteSession(convId uint64, userId uint32) {
delete(k.sessionUserIdMap, userId)
k.sessionMapLock.Unlock()
}
func (k *KcpConnectManager) autoSyncGlobalGsOnlineMap() {
ticker := time.NewTicker(time.Second * 60)
for {
<-ticker.C
k.syncGlobalGsOnlineMap()
}
}
func (k *KcpConnectManager) syncGlobalGsOnlineMap() {
rsp, err := k.discovery.GetGlobalGsOnlineMap(context.TODO(), nil)
if err != nil {
logger.Error("get global gs online map error: %v", err)
return
}
copyMap := make(map[uint32]string)
for k, v := range rsp.GlobalGsOnlineMap {
copyMap[k] = v
}
copyMapLen := len(copyMap)
k.globalGsOnlineMapLock.Lock()
k.globalGsOnlineMap = copyMap
k.globalGsOnlineMapLock.Unlock()
logger.Info("sync global gs online map finish, len: %v", copyMapLen)
}

View File

@@ -85,7 +85,7 @@ func Run(ctx context.Context, configFile string) error {
messageQueue := mq.NewMessageQueue(api.GS, APPID, client)
defer messageQueue.Close()
gameManager := game.NewGameManager(db, messageQueue, GSID, APPID, mainGsAppid.AppId)
gameManager := game.NewGameManager(db, messageQueue, GSID, APPID, mainGsAppid.AppId, client.Discovery)
defer gameManager.Close()
// natsrpc server

View File

@@ -6,6 +6,7 @@ import (
"time"
"hk4e/common/mq"
"hk4e/common/rpc"
"hk4e/gate/kcp"
"hk4e/gdconf"
"hk4e/gs/dao"
@@ -46,6 +47,7 @@ var ONLINE_PLAYER_NUM int32 = 0 // 当前在线玩家数
var SELF *model.Player
type GameManager struct {
discovery *rpc.DiscoveryClient // node服务器客户端
dao *dao.Dao
snowflake *alg.SnowflakeWorker
gsId uint32
@@ -54,8 +56,9 @@ type GameManager struct {
ai *model.Player // 本服的Ai玩家对象
}
func NewGameManager(dao *dao.Dao, messageQueue *mq.MessageQueue, gsId uint32, gsAppid string, mainGsAppid string) (r *GameManager) {
func NewGameManager(dao *dao.Dao, messageQueue *mq.MessageQueue, gsId uint32, gsAppid string, mainGsAppid string, discovery *rpc.DiscoveryClient) (r *GameManager) {
r = new(GameManager)
r.discovery = discovery
r.dao = dao
MESSAGE_QUEUE = messageQueue
r.snowflake = alg.NewSnowflakeWorker(int64(gsId))

View File

@@ -240,14 +240,16 @@ func (g *GameManager) LoginNotify(userId uint32, player *model.Player, clientSeq
g.SendMsg(cmd.QuestListNotify, userId, clientSeq, g.PacketQuestListNotify(player))
// g.GCGLogin(player) // 发送GCG登录相关的通知包
playerLoginRsp := &proto.PlayerLoginRsp{
IsUseAbilityHash: true,
AbilityHashCode: 0,
GameBiz: "hk4e_global",
IsScOpen: false,
RegisterCps: "mihoyo",
CountryCode: "CN",
Birthday: "2000-01-01",
TotalTickTime: 0.0,
IsUseAbilityHash: true,
AbilityHashCode: 0,
IsEnableClientHashDebug: true,
IsScOpen: false,
ScInfo: []byte{},
TotalTickTime: 0.0,
GameBiz: "hk4e_global",
RegisterCps: "mihoyo",
CountryCode: "US",
Birthday: "2000-01-01",
}
g.SendMsg(cmd.PlayerLoginRsp, userId, clientSeq, playerLoginRsp)
}

View File

@@ -1,6 +1,8 @@
package game
import (
"context"
"sync"
"time"
"hk4e/gs/dao"
@@ -19,10 +21,11 @@ import (
// 玩家定时保存 写入db和redis
type UserManager struct {
dao *dao.Dao // db对象
playerMap map[uint32]*model.Player // 内存玩家数据
saveUserChan chan *SaveUserData // 用于主协程发送玩家数据给定时保存协程
remotePlayerMap map[uint32]string // 远程玩家 key:userId value:玩家所在gs的appid
dao *dao.Dao // db对象
playerMap map[uint32]*model.Player // 内存玩家数据
saveUserChan chan *SaveUserData // 用于主协程发送玩家数据给定时保存协程
remotePlayerMap map[uint32]string // 远程玩家 key:userId value:玩家所在gs的appid
remotePlayerMapLock sync.RWMutex
}
func NewUserManager(dao *dao.Dao) (r *UserManager) {
@@ -32,6 +35,8 @@ func NewUserManager(dao *dao.Dao) (r *UserManager) {
r.saveUserChan = make(chan *SaveUserData) // 无缓冲区chan 避免主协程在写入时被迫加锁
r.remotePlayerMap = make(map[uint32]string)
go r.saveUserHandle()
r.syncRemotePlayerMap()
go r.autoSyncRemotePlayerMap()
return r
}
@@ -251,8 +256,39 @@ func (u *UserManager) ChangeUserDbState(player *model.Player, state int) {
// 远程玩家相关操作
func (u *UserManager) autoSyncRemotePlayerMap() {
ticker := time.NewTicker(time.Second * 60)
for {
<-ticker.C
u.syncRemotePlayerMap()
}
}
func (u *UserManager) syncRemotePlayerMap() {
rsp, err := GAME_MANAGER.discovery.GetGlobalGsOnlineMap(context.TODO(), nil)
if err != nil {
logger.Error("get global gs online map error: %v", err)
return
}
copyMap := make(map[uint32]string)
for k, v := range rsp.GlobalGsOnlineMap {
player, exist := u.playerMap[k]
if exist && player.Online {
continue
}
copyMap[k] = v
}
copyMapLen := len(copyMap)
u.remotePlayerMapLock.Lock()
u.remotePlayerMap = copyMap
u.remotePlayerMapLock.Unlock()
logger.Info("sync remote player map finish, len: %v", copyMapLen)
}
func (u *UserManager) GetRemoteUserOnlineState(userId uint32) bool {
u.remotePlayerMapLock.RLock()
_, exist := u.remotePlayerMap[userId]
u.remotePlayerMapLock.RUnlock()
if !exist {
return false
} else {
@@ -261,7 +297,9 @@ func (u *UserManager) GetRemoteUserOnlineState(userId uint32) bool {
}
func (u *UserManager) GetRemoteUserGsAppId(userId uint32) string {
u.remotePlayerMapLock.RLock()
appId, exist := u.remotePlayerMap[userId]
u.remotePlayerMapLock.RUnlock()
if !exist {
return ""
} else {
@@ -270,12 +308,14 @@ func (u *UserManager) GetRemoteUserGsAppId(userId uint32) string {
}
func (u *UserManager) SetRemoteUserOnlineState(userId uint32, isOnline bool, appId string) {
u.remotePlayerMapLock.Lock()
if isOnline {
u.remotePlayerMap[userId] = appId
} else {
delete(u.remotePlayerMap, userId)
u.DeleteUser(userId)
}
u.remotePlayerMapLock.Unlock()
}
// GetRemoteOnlineUserList 获取指定数量的远程在线玩家 玩家数据只读禁止修改
@@ -285,16 +325,22 @@ func (u *UserManager) GetRemoteOnlineUserList(total int) map[uint32]*model.Playe
}
onlinePlayerMap := make(map[uint32]*model.Player)
count := 0
userIdList := make([]uint32, 0)
u.remotePlayerMapLock.RLock()
for userId := range u.remotePlayerMap {
userIdList = append(userIdList, userId)
count++
if count >= total {
break
}
}
u.remotePlayerMapLock.RUnlock()
for _, userId := range userIdList {
player := u.LoadTempOfflineUser(userId, false)
if player == nil {
continue
}
onlinePlayerMap[player.PlayerID] = player
count++
if count >= total {
break
}
}
return onlinePlayerMap
}

View File

@@ -21,6 +21,8 @@ service Discovery {
rpc GetAllGateServerInfoList (NullMsg) returns (GateServerInfoList) {}
// 获取主游戏服务器的appid
rpc GetMainGameServerAppId (NullMsg) returns (GetMainGameServerAppIdRsp) {}
// 获取全服玩家GS在线列表
rpc GetGlobalGsOnlineMap (NullMsg) returns (GetGlobalGsOnlineMapRsp) {}
}
message NullMsg {
@@ -84,3 +86,7 @@ message GateServerInfo {
message GateServerInfoList {
repeated GateServerInfo gate_server_info_list = 1;
}
message GetGlobalGsOnlineMapRsp {
map<uint32, string> GlobalGsOnlineMap = 1;
}

View File

@@ -5,4 +5,5 @@ const (
GS = "GS"
ANTICHEAT = "ANTICHEAT"
PATHFINDING = "PATHFINDING"
NODE = "NODE"
)

View File

@@ -8,6 +8,8 @@ import (
"syscall"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/node/api"
"hk4e/node/service"
"hk4e/pkg/logger"
@@ -30,7 +32,12 @@ func Run(ctx context.Context, configFile string) error {
return err
}
defer conn.Close()
s, err := service.NewService(conn)
// 只用来监听全服广播
messageQueue := mq.NewMessageQueue(api.NODE, "node", nil)
defer messageQueue.Close()
s, err := service.NewService(conn, messageQueue)
if err != nil {
return err
}

View File

@@ -50,9 +50,11 @@ type ServerInstance struct {
}
type DiscoveryService struct {
regionEc2b *random.Ec2b // 全局区服密钥信息
serverInstanceMap map[string]*sync.Map // 全部服务器实例集合 key:服务器类型 value:服务器实例集合 -> key:appid value:服务器实例
serverAppIdMap *sync.Map // 服务器appid集合 key:appid value:是否存在
regionEc2b *random.Ec2b // 全局区服密钥信息
serverInstanceMap map[string]*sync.Map // 全部服务器实例集合 key:服务器类型 value:服务器实例集合 -> key:appid value:服务器实例
serverAppIdMap *sync.Map // 服务器appid集合 key:appid value:是否存在
globalGsOnlineMap map[uint32]string
globalGsOnlineMapLock sync.RWMutex
}
func NewDiscoveryService() *DiscoveryService {
@@ -65,6 +67,7 @@ func NewDiscoveryService() *DiscoveryService {
r.serverInstanceMap[api.ANTICHEAT] = new(sync.Map)
r.serverInstanceMap[api.PATHFINDING] = new(sync.Map)
r.serverAppIdMap = new(sync.Map)
r.globalGsOnlineMap = make(map[uint32]string)
go r.removeDeadServer()
return r
}
@@ -281,6 +284,19 @@ func (s *DiscoveryService) GetMainGameServerAppId(ctx context.Context, req *api.
}, nil
}
// GetGlobalGsOnlineMap 获取全服玩家GS在线列表
func (s *DiscoveryService) GetGlobalGsOnlineMap(ctx context.Context, req *api.NullMsg) (*api.GetGlobalGsOnlineMapRsp, error) {
copyMap := make(map[uint32]string)
s.globalGsOnlineMapLock.RLock()
for k, v := range s.globalGsOnlineMap {
copyMap[k] = v
}
s.globalGsOnlineMapLock.RUnlock()
return &api.GetGlobalGsOnlineMapRsp{
GlobalGsOnlineMap: copyMap,
}, nil
}
func (s *DiscoveryService) getRandomServerInstance(instMap *sync.Map) *ServerInstance {
instList := make(ServerInstanceSortList, 0)
instMap.Range(func(key, value any) bool {

View File

@@ -1,6 +1,7 @@
package service
import (
"hk4e/common/mq"
"hk4e/node/api"
"github.com/byebyebruce/natsrpc"
@@ -9,9 +10,11 @@ import (
)
type Service struct {
messageQueue *mq.MessageQueue
discoveryService *DiscoveryService
}
func NewService(conn *nats.Conn) (*Service, error) {
func NewService(conn *nats.Conn, messageQueue *mq.MessageQueue) (*Service, error) {
enc, err := nats.NewEncodedConn(conn, protobuf.PROTOBUF_ENCODER)
if err != nil {
return nil, err
@@ -25,8 +28,36 @@ func NewService(conn *nats.Conn) (*Service, error) {
if err != nil {
return nil, err
}
return &Service{}, nil
s := &Service{
messageQueue: messageQueue,
discoveryService: discoveryService,
}
go s.BroadcastReceiver()
return s, nil
}
func (s *Service) Close() {
}
func (s *Service) BroadcastReceiver() {
for {
netMsg := <-s.messageQueue.GetNetMsg()
if netMsg.MsgType != mq.MsgTypeServer {
continue
}
if netMsg.EventId != mq.ServerUserOnlineStateChangeNotify {
continue
}
if netMsg.OriginServerType != api.GS {
continue
}
serverMsg := netMsg.ServerMsg
s.discoveryService.globalGsOnlineMapLock.Lock()
if serverMsg.IsOnline {
s.discoveryService.globalGsOnlineMap[serverMsg.UserId] = netMsg.OriginServerAppId
} else {
delete(s.discoveryService.globalGsOnlineMap, serverMsg.UserId)
}
s.discoveryService.globalGsOnlineMapLock.Unlock()
}
}

View File

@@ -25,34 +25,32 @@ func NewHandle(messageQueue *mq.MessageQueue) (r *Handle) {
}
func (h *Handle) run() {
for i := 0; i < 1; i++ {
go func() {
for {
netMsg := <-h.messageQueue.GetNetMsg()
if netMsg.MsgType != mq.MsgTypeGame {
continue
}
if netMsg.EventId != mq.NormalMsg {
continue
}
if netMsg.OriginServerType != api.GATE {
continue
}
gameMsg := netMsg.GameMsg
switch gameMsg.CmdId {
case cmd.QueryPathReq:
h.QueryPath(gameMsg.UserId, netMsg.OriginServerAppId, gameMsg.PayloadMessage)
case cmd.ObstacleModifyNotify:
h.ObstacleModifyNotify(gameMsg.UserId, netMsg.OriginServerAppId, gameMsg.PayloadMessage)
}
go func() {
for {
netMsg := <-h.messageQueue.GetNetMsg()
if netMsg.MsgType != mq.MsgTypeGame {
continue
}
}()
}
if netMsg.EventId != mq.NormalMsg {
continue
}
if netMsg.OriginServerType != api.GATE {
continue
}
gameMsg := netMsg.GameMsg
switch gameMsg.CmdId {
case cmd.QueryPathReq:
h.QueryPath(gameMsg.UserId, netMsg.OriginServerAppId, gameMsg.PayloadMessage)
case cmd.ObstacleModifyNotify:
h.ObstacleModifyNotify(gameMsg.UserId, netMsg.OriginServerAppId, gameMsg.PayloadMessage)
}
}
}()
}
// SendMsg 发送消息给客户端
func (h *Handle) SendMsg(cmdId uint16, userId uint32, gateAppId string, payloadMsg pb.Message) {
if userId < 100000000 || payloadMsg == nil {
if payloadMsg == nil {
return
}
gameMsg := new(mq.GameMsg)

View File

@@ -1,7 +1,7 @@
package handle
import (
"hk4e/pathfinding/pfalg"
"hk4e/pkg/alg"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"hk4e/protocol/proto"
@@ -9,15 +9,15 @@ import (
pb "google.golang.org/protobuf/proto"
)
func (h *Handle) ConvPbVecToMeshVec(pbVec *proto.Vector) pfalg.MeshVector {
return pfalg.MeshVector{
func (h *Handle) ConvPbVecToMeshVec(pbVec *proto.Vector) alg.MeshVector {
return alg.MeshVector{
X: int16(pbVec.X),
Y: int16(pbVec.Y),
Z: int16(pbVec.Z),
}
}
func (h *Handle) ConvMeshVecToPbVec(meshVec pfalg.MeshVector) *proto.Vector {
func (h *Handle) ConvMeshVecToPbVec(meshVec alg.MeshVector) *proto.Vector {
return &proto.Vector{
X: float32(meshVec.X),
Y: float32(meshVec.Y),
@@ -25,15 +25,15 @@ func (h *Handle) ConvMeshVecToPbVec(meshVec pfalg.MeshVector) *proto.Vector {
}
}
func (h *Handle) ConvPbVecListToMeshVecList(pbVecList []*proto.Vector) []pfalg.MeshVector {
ret := make([]pfalg.MeshVector, 0)
func (h *Handle) ConvPbVecListToMeshVecList(pbVecList []*proto.Vector) []alg.MeshVector {
ret := make([]alg.MeshVector, 0)
for _, pbVec := range pbVecList {
ret = append(ret, h.ConvPbVecToMeshVec(pbVec))
}
return ret
}
func (h *Handle) ConvMeshVecListToPbVecList(meshVecList []pfalg.MeshVector) []*proto.Vector {
func (h *Handle) ConvMeshVecListToPbVecList(meshVecList []alg.MeshVector) []*proto.Vector {
ret := make([]*proto.Vector, 0)
for _, meshVec := range meshVecList {
ret = append(ret, h.ConvMeshVecToPbVec(meshVec))
@@ -45,7 +45,7 @@ func (h *Handle) QueryPath(userId uint32, gateAppId string, payloadMsg pb.Messag
req := payloadMsg.(*proto.QueryPathReq)
logger.Debug("query path req: %v, uid: %v, gateAppId: %v", req, userId, gateAppId)
var ok = false
var path []pfalg.MeshVector = nil
var path []alg.MeshVector = nil
for _, destinationPos := range req.DestinationPos {
ok, path = h.worldStatic.Pathfinding(h.ConvPbVecToMeshVec(req.SourcePos), h.ConvPbVecToMeshVec(destinationPos))
if ok {

View File

@@ -1,7 +0,0 @@
package pfalg
type MeshVector struct {
X int16
Y int16
Z int16
}

View File

@@ -5,18 +5,18 @@ import (
"encoding/gob"
"os"
"hk4e/pathfinding/pfalg"
"hk4e/pkg/alg"
"hk4e/pkg/logger"
)
type WorldStatic struct {
// x y z -> if terrain exist
terrain map[pfalg.MeshVector]bool
terrain map[alg.MeshVector]bool
}
func NewWorldStatic() (r *WorldStatic) {
r = new(WorldStatic)
r.terrain = make(map[pfalg.MeshVector]bool)
r.terrain = make(map[alg.MeshVector]bool)
return r
}
@@ -52,7 +52,7 @@ func (w *WorldStatic) SaveTerrain() bool {
}
func (w *WorldStatic) GetTerrain(x int16, y int16, z int16) (exist bool) {
pos := pfalg.MeshVector{
pos := alg.MeshVector{
X: x,
Y: y,
Z: z,
@@ -62,7 +62,7 @@ func (w *WorldStatic) GetTerrain(x int16, y int16, z int16) (exist bool) {
}
func (w *WorldStatic) SetTerrain(x int16, y int16, z int16) {
pos := pfalg.MeshVector{
pos := alg.MeshVector{
X: x,
Y: y,
Z: z,
@@ -70,13 +70,13 @@ func (w *WorldStatic) SetTerrain(x int16, y int16, z int16) {
w.terrain[pos] = true
}
func (w *WorldStatic) Pathfinding(startPos pfalg.MeshVector, endPos pfalg.MeshVector) (bool, []pfalg.MeshVector) {
bfs := pfalg.NewBFS()
func (w *WorldStatic) Pathfinding(startPos alg.MeshVector, endPos alg.MeshVector) (bool, []alg.MeshVector) {
bfs := alg.NewBFS()
bfs.InitMap(
w.terrain,
startPos,
endPos,
100,
0,
)
pathVectorList := bfs.Pathfinding()
if pathVectorList == nil {

View File

@@ -1,8 +1,6 @@
package pfalg
package alg
import (
"hk4e/pkg/alg"
)
// 广度优先搜索寻路
const (
NODE_NONE = iota
@@ -166,7 +164,7 @@ func (b *BFS) GetPath() []*PathNode {
}
func (b *BFS) Pathfinding() []MeshVector {
queue := alg.NewALQueue[*PathNode]()
queue := NewALQueue[*PathNode]()
b.startPathNode.visit = true
queue.EnQueue(b.startPathNode)
for queue.Len() > 0 {

View File

@@ -65,3 +65,10 @@ func Vector3CrossProd(v1 *Vector3, v2 *Vector3) *Vector3 {
v3.Z = v1.X*v2.Y - v2.X*v1.Y
return v3
}
// MeshVector 网格向量
type MeshVector struct {
X int16
Y int16
Z int16
}