创建NetClient纤程并发送消息

登录的入口在类LoginHelper中,只有一个方法

public static class LoginHelper
{
    public static async ETTask Login(Scene root, string account, string password)
    {
        root.RemoveComponent<ClientSenderComponent>();
        
        ClientSenderComponent clientSenderComponent = root.AddComponent<ClientSenderComponent>();
        
        long playerId = await clientSenderComponent.LoginAsync(account, password);

        await ETTask.CompletedTask;
    }
}

很明显,核心逻辑都在组件ClientSenderComponent中,这里传入的Scene是MainScene,但是如我们之前所说,客户端和服务器的交互逻辑其实是在NetClientScene中进行的

所以我们详细了解一下这个组件

[ComponentOf(typeof(Scene))]
public class ClientSenderComponent: Entity, IAwake, IDestroy
{
    public int fiberId;

    public ActorId netClientActorId;
}
  • fiberID 所属纤程ID
  • netClientActorId NetClient的ActorID,如上一章所说,一个ActorID可以锁定唯一的一个Unit

根据ET的规则,组件的具体逻辑都会在相应的System中,这个组件对应的就是ClientSenderComponentSystem,具体看一下LoginAsync方法

public static async ETTask<long> LoginAsync(this ClientSenderComponent self, string account, string password)
{
    self.fiberId = await FiberManager.Instance.Create(SchedulerType.ThreadPool, 0, SceneType.NetClient, "");
    self.netClientActorId = new ActorId(self.Fiber().Process, self.fiberId);

    Main2NetClient_Login main2NetClientLogin = Main2NetClient_Login.Create();
    main2NetClientLogin.OwnerFiberId = self.Fiber().Id;
    main2NetClientLogin.Account = account;
    main2NetClientLogin.Password = password;
    NetClient2Main_Login response = await self.Root().GetComponent<ProcessInnerSender>().Call(self.netClientActorId, main2NetClientLogin) as NetClient2Main_Login;
    return response.PlayerId;
}

我们创建了一个新的Fiber,然后用这个Fiber锁定唯一一个ActorId,前面说过ActorID=FiberID+InstanceID,这里没有传入InstanceID,为什么?

其实,在Fiber的构造函数中可以发现,Fiber对应的Scene的InstanceId默认是1

internal Fiber(int id, int zone, SceneType sceneType, string name)
{
 	// 这里第三个参数就是Scene的InstanceId,默认是1
    this.Root = new Scene(this, id, 1, sceneType, name);
}

同时,再观察ActorId的构造函数,可以发现ActorId默认的InstanceId也是1

public ActorId(int process, int fiber)
{
    this.Address = new Address(process, fiber);
    this.InstanceId = 1;
}

所以,上面这个netClientActorId也很明显了,它对应的Unit就是这个Fiber下对应的Scene

然后就是创建一条网络消息,并发往目标Unit

上一章说过,不同Fiber间通讯是通过组件ProcessInnerSender进行的,这里需要知道目标Unit的ActorId,其实也就是NetClientFiber对应的Scene

NetClient消息处理

然后我们可以找到这样一个类

[MessageHandler(SceneType.NetClient)]
public class Main2NetClient_LoginHandler : MessageHandler<Scene, Main2NetClient_Login, NetClient2Main_Login>
{
    ...
}

这里有几个重点

  • 由于这个一个客户端内部消息,所以继承自MessageHandler<A,B,C>

    • A 消息处理的Fiber拥有的Scene
    • B 发送消息类型
    • C 回复消息类型
  • 这个消息对应的proto定义如下,必须是IRequest/IResponse(内部消息)

    // ResponseType NetClient2Main_Login
    message Main2NetClient_Login // IRequest
    {
    	int32 RpcId = 1;// 必须
          
    	int32 OwnerFiberId = 2;
    	string Account = 3;		// 账号
    	string Password = 4; 	// 密码
    }
      
    message NetClient2Main_Login // IResponse
    {
    	int32 RpcId = 1;// 必须
    	int32 Error = 2;// 必须
    	string Message = 3;// 必须
      
    	int64 PlayerId = 4;
    }
    
  • 你必须知道这个消息是由哪个Fiber处理的,需要加上对应的标签 [MessageHandler(SceneType.NetClient)]

接下来我们具体看一下这个消息处理的内容

软路由地址、网关负载地址

这一步是通过RouterManager获取软路由的地址

protected override async ETTask Run(Scene root, Main2NetClient_Login request, NetClient2Main_Login response)
{
    string account = request.Account;
    string password = request.Password;

    root.RemoveComponent<RouterAddressComponent>();
    RouterAddressComponent routerAddressComponent = root.AddComponent<RouterAddressComponent, string, int>(ConstValue.RouterHttpHost, ConstValue.RouterHttpPort);
    await routerAddressComponent.Init();
    IPEndPoint realmAddress = routerAddressComponent.GetRealmAddress(account);
        
	...
}

添加组件时我们需要指定RouterManager的IP地址和端口号,定义在常量中了,默认的话是本机地址

public static partial class ConstValue
{
    public const string RouterHttpHost = "127.0.0.1";
    
    public const int RouterHttpPort = 30300;
    
    public const int SessionTimeoutTime = 30 * 1000;
}

随后初始化

public static async ETTask Init(this RouterAddressComponent self)
{
    self.RouterManagerIPAddress = NetworkHelper.GetHostAddress(self.RouterManagerHost);
    await self.GetAllRouter();
}

private static async ETTask GetAllRouter(this RouterAddressComponent self)
{
    string url = $"http://{self.RouterManagerHost}:{self.RouterManagerPort}/get_router?v={RandomGenerator.RandUInt32()}";
    Log.Debug($"start get router info: {url}");
    string routerInfo = await HttpClientHelper.Get(url);
    Log.Debug($"recv router info: {routerInfo}");

    HttpGetRouterResponse httpGetRouterResponse = JsonHelper.ToObject<HttpGetRouterResponse>(routerInfo);
    self.Info = httpGetRouterResponse;
    Log.Debug($"start get router info finish: {JsonHelper.ToJson(httpGetRouterResponse)}");

    // 打乱顺序
    RandomGenerator.BreakRank(self.Info.Routers);
	// 每过10分钟,重新获取一次
    self.WaitTenMinGetAllRouter().Coroutine();
}

这里也是比较好理解的,通过一个Http请求,向RouterManager获取所有的软路由信息,网关负载服信息,这两个东西其实都在Http回复的数据结构内了

public partial class HttpGetRouterResponse
{
    public List<string> Realms { get; set; } = new();

    public List<string> Routers { get; set; } = new();
}

随后我们随机我们Realm服(网关负载服)的地址,其实是通过account取模来获取的

public static IPEndPoint GetRealmAddress(this RouterAddressComponent self, string account)
{
    int v = account.Mode(self.Info.Realms.Count);
    string address = self.Info.Realms[v];
    string[] ss = address.Split(':');
    IPAddress ipAddress = IPAddress.Parse(ss[0]);
    return new IPEndPoint(ipAddress, int.Parse(ss[1]));
}

服务器消息处理

请求网关

上文已经获取了Realm服的地址了,现在我们就可以向他请求分配网关了

protected override async ETTask Run(Scene root, Main2NetClient_Login request, NetClient2Main_Login response)
{
    string account = request.Account;
    string password = request.Password;

    ...

    IPEndPoint realmAddress = routerAddressComponent.GetRealmAddress(account);

    R2C_Login r2CLogin;
    using (Session session = await netComponent.CreateRouterSession(realmAddress, account, password))
    {
        C2R_Login c2RLogin = C2R_Login.Create();
        c2RLogin.Account = account;
        c2RLogin.Password = password;
        r2CLogin = (R2C_Login)await session.Call(c2RLogin);
    }
}

之前的文章说过,所有CS消息都必须通过软路由转发,不过现在看来好像只获取了Realm服的地址,并没有对软路由地址进行什么处理?

别急,上面11行,我们想要请求一个网关地址,就需要和Realm服通讯,此时我们观察扩展方法CreateRouterSession

这个方法会创建一个Session,所谓Session就是CS之间通讯的桥梁/手机,客户端和服务器各自持有一部,那是如何创建的呢?

public static async ETTask<Session> CreateRouterSession(this NetComponent netComponent, IPEndPoint address, string account, string password)
{
    uint localConn = (uint)(account.GetLongHashCode() ^ password.GetLongHashCode() ^ RandomGenerator.RandUInt32());
    (uint recvLocalConn, IPEndPoint routerAddress) = await GetRouterAddress(netComponent, address, localConn, 0);

    if (recvLocalConn == 0)
    {
        throw new Exception($"get router fail: {netComponent.Root().Id} {address}");
    }
    
    Log.Info($"get router: {recvLocalConn} {routerAddress}");

    Session routerSession = netComponent.Create(routerAddress, address, recvLocalConn);
    routerSession.AddComponent<PingComponent>();
    routerSession.AddComponent<RouterCheckComponent>();
    
    return routerSession;
}

先忽略recvLocalConn参数,routerAddress差不多也是随机一个软路由的地址,之后就是创建了一个Session,需要两个地址,第一个是软路由地址,第二个是Realm服地址

返回Session后,我们对Session发起登录请求

Fiber与Invoke

再创建Fiber时,会调用的一种类似虚函数的机制

public async ETTask<int> Create(SchedulerType schedulerType, int fiberId, int zone, SceneType sceneType, string name)
{
    try
    {
		...

        fiber.ThreadSynchronizationContext.Post(async () =>
        {
            try
            {
                 // 根据Fiber的SceneType分发Init,必须在Fiber线程中执行
                 // Invoke跟Publish的区别(特别注意)
        		// Invoke类似函数,必须有被调用方,否则异常,调用者跟被调用者属于同一模块,比如MoveComponent中的Timer计时器,调用跟被调用的代码均属于移动模块
        		// 既然Invoke跟函数一样,那么为什么不使用函数呢? 因为有时候不方便直接调用,比如Config加载,在客户端跟服务端加载方式不一样。比如TimerComponent需要根据Id分发
        		// 注意,不要把Invoke当函数使用,这样会造成代码可读性降低,能用函数不要用Invoke
        		// publish是事件,抛出去可以没人订阅,调用者跟被调用者属于两个模块,比如任务系统需要知道道具使用的信息,则订阅道具使用事件
                await EventSystem.Instance.Invoke<FiberInit, ETTask>((long)sceneType, new FiberInit() {Fiber = fiber});
                tcs.SetResult(true);
            }
            catch (Exception e)
            {
                ...
            }
        });

        ...
    }
    catch (Exception e)
    {
        ...
    }
}

比如Main Fiber,就会对应这样一个类,在创建Main Fiber时,会执行Handle方法

[Invoke((long)SceneType.Main)]
public class FiberInit_Main: AInvokeHandler<FiberInit, ETTask>
{
    public override async ETTask Handle(FiberInit fiberInit)
    {
        Scene root = fiberInit.Fiber.Root;
       
        await EventSystem.Instance.PublishAsync(root, new EntryEvent1());
        await EventSystem.Instance.PublishAsync(root, new EntryEvent2());
        await EventSystem.Instance.PublishAsync(root, new EntryEvent3());
    }
}

对于Realm Fiber而言,他会进行如下初始化,添加了很多必要的组件在主Scene身上

[Invoke((long)SceneType.Realm)]
public class FiberInit_Realm: AInvokeHandler<FiberInit, ETTask>
{
    public override async ETTask Handle(FiberInit fiberInit)
    {
        Scene root = fiberInit.Fiber.Root;
        root.AddComponent<MailBoxComponent, MailBoxType>(MailBoxType.UnOrderedMessage);
        root.AddComponent<TimerComponent>();
        root.AddComponent<CoroutineLockComponent>();
        root.AddComponent<ProcessInnerSender>();
        root.AddComponent<MessageSender>();
        StartSceneConfig startSceneConfig = StartSceneConfigCategory.Instance.Get(root.Fiber.Id);
        root.AddComponent<NetComponent, IPEndPoint, NetworkProtocol>(startSceneConfig.InnerIPPort, NetworkProtocol.UDP);
        root.AddComponent<DBManagerComponent>();
        await ETTask.CompletedTask;
    }
}

处理登录请求

[MessageSessionHandler(SceneType.Realm)]
public class C2R_LoginHandler : MessageSessionHandler<C2R_Login, R2C_Login>
{
    protected override async ETTask Run(Session session, C2R_Login request, R2C_Login response)
    {
        if (string.IsNullOrEmpty(request.Account) || string.IsNullOrEmpty(request.Password))
        {
            response.Error = ErrorCode.ERR_LoginInfoEmpty;
            CloseSession(session).Coroutine();
            return;
        }
        ...
    }

    private async ETTask CloseSession(Session session)
    {
        await session.Root().GetComponent<TimerComponent>().WaitAsync(1000);
        session.Dispose();
    }
}

与直接客户端内部通讯的Handler基本相同,不一样的地方在于,CS直接的通讯消息,必须继承MessageSessionHandler

注意这里的Session,不是客户端那一侧的,之前说过了CS直接通讯,各自都有各自的手机,所以这个Session是服务器这一侧对应的那一部手机,并且这个Session是隶属于Realm Fiber下的

数据库相关

[FriendOf(typeof(AccountInfo))]
[MessageSessionHandler(SceneType.Realm)]
public class C2R_LoginHandler : MessageSessionHandler<C2R_Login, R2C_Login>
{
    protected override async ETTask Run(Session session, C2R_Login request, R2C_Login response)
    {
	  ...

       var dbComponent = session.Root().GetComponent<DBManagerComponent>().GetZoneDB(session.Zone());
       var infos = await dbComponent.Query<AccountInfo>(info => info.Account == request.Account);

       if (infos.Count <= 0)
       {
          var accountInfosComponent = session.GetComponent<AccountInfosComponent>() ??
              session.AddComponent<AccountInfosComponent>();
          var accountInfo = accountInfosComponent.AddChild<AccountInfo>();
          accountInfo.Account = request.Account;
          accountInfo.Password = request.Password;
          await dbComponent.Save(accountInfo);
       }
       else
       {
          var accountInfo = infos[0];
          if (accountInfo.Password!=request.Password)
          {
             response.Error = ErrorCode.ERR_LoginPasswordError;
             CloseSession(session).Coroutine();
             return;
          }
       }
    }
	...
}

逻辑上是比较简单的

这里session隶属于Realm服,而上面有说过,Realm服创建的时候,会添加DBManagerComponent,自然这里也能拿得到

然后查询数据库,看看是否创建过账号,如果创建过,比对一下密码,否则就新增一个账号信息,AccountInfo和对应的Component比较简单就不展开了

这里只有一个注意点,我们通过DBManager获取数据库时,需要传入区服ID,这很正常,不同区服的数据库肯定不一样,那么session的区服是多少呢?自然等于Realm服的区服,那是多少呢?

定义在配置表中,是一区

image-20240508010201629

但是区服又是如何与数据库关联的呢?我们进入方法看看

public static DBComponent GetZoneDB(this DBManagerComponent self, int zone)
{
    DBComponent dbComponent = self.DBComponents[zone];
    if (dbComponent != null)
    {
        return dbComponent;
    }
	// 关键
    StartZoneConfig startZoneConfig = StartZoneConfigCategory.Instance.Get(zone);
    if (startZoneConfig.DBConnection == "")
    {
        throw new Exception($"zone: {zone} not found mongo connect string");
    }

    dbComponent = self.AddChild<DBComponent, string, string, int>(startZoneConfig.DBConnection, startZoneConfig.DBName, zone);
    self.DBComponents[zone] = dbComponent;
    return dbComponent;
}

关键是查询了StartZoneConfig配置,看一下这个配置就一目了然了

image-20240508010347695

1对应的就是游戏服

处理网关请求

[FriendOf(typeof(AccountInfo))]
[MessageSessionHandler(SceneType.Realm)]
public class C2R_LoginHandler : MessageSessionHandler<C2R_Login, R2C_Login>
{
    protected override async ETTask Run(Session session, C2R_Login request, R2C_Login response)
    {
        ...

        // 随机分配一个Gate
        StartSceneConfig config = RealmGateAddressHelper.GetGate(session.Zone(), request.Account);
       
        // 向gate请求一个key,客户端可以拿着这个key连接gate
        R2G_GetLoginKey r2GGetLoginKey = R2G_GetLoginKey.Create();
        r2GGetLoginKey.Account = request.Account;
        G2R_GetLoginKey g2RGetLoginKey = (G2R_GetLoginKey) await session.Fiber().Root.GetComponent<MessageSender>().Call(
            config.ActorId, r2GGetLoginKey);

        response.Address = config.InnerIPPort.ToString();
        response.Key = g2RGetLoginKey.Key;
        response.GateId = g2RGetLoginKey.GateId;

        CloseSession(session).Coroutine();
    }
    ...
}

GetGates就是在StartSceneConfig里随机获得一个Gate配置

随后Realm服向Gate服发送网关请求

为什么需要使用组件MessageSender?

在最开始Main向NetClient发送消息时,我们直接使用ProcessInnerSender发送了消息,那是因为Main和NetClient必然属于同一个经常,这是客户端的2个Fiber,我们是确定的

但这里是服务器内部通讯,我们不确定两个Fiber是否在同一个进程,如果不在一起,那就需要通过NetInnerFiber转发

MessageSender其实就帮我们处理了这部分逻辑,他会判断应该使用ProcessInner还是NetInner

至于这里Gate服的ActorId,也是根据配表中的ID以及进程ID创建的

[MessageHandler(SceneType.Gate)]
public class R2G_GetLoginKeyHandler : MessageHandler<Scene, R2G_GetLoginKey, G2R_GetLoginKey>
{
    protected override async ETTask Run(Scene scene, R2G_GetLoginKey request, G2R_GetLoginKey response)
    {
       long key = RandomGenerator.RandInt64();
       scene.GetComponent<GateSessionKeyComponent>().Add(key, request.Account);
       response.Key = key;
       response.GateId = scene.Id;
       await ETTask.CompletedTask;
    }
}

消息处理也很简单,就是随机了一个Long,并保存下来

请求登录网关

获取网关key之后,我们回到客户端侧,通过key直接登录网关

protected override async ETTask Run(Scene root, Main2NetClient_Login request, NetClient2Main_Login response)
{
	...

    R2C_Login r2CLogin;
	...

    // 创建一个gate Session,并且保存到SessionComponent中
    Session gateSession = await netComponent.CreateRouterSession(NetworkHelper.ToIPEndPoint(r2CLogin.Address),
                                                                 account, password);
    gateSession.AddComponent<ClientSessionErrorComponent>();
    root.AddComponent<SessionComponent>().Session = gateSession;
    C2G_LoginGate c2GLoginGate = C2G_LoginGate.Create();
    c2GLoginGate.Key = r2CLogin.Key;
    c2GLoginGate.GateId = r2CLogin.GateId;
    G2C_LoginGate g2CLoginGate = (G2C_LoginGate)await gateSession.Call(c2GLoginGate);

    response.PlayerId = g2CLoginGate.PlayerId;

    await ETTask.CompletedTask;
}

我们创建了一个软路由与Gate服的Session,并保存到SessionComponent中,然后发送了一条登录网关请求

[MessageSessionHandler(SceneType.Gate)]
public class C2G_LoginGateHandler : MessageSessionHandler<C2G_LoginGate, G2C_LoginGate>
{
    protected override async ETTask Run(Session session, C2G_LoginGate request, G2C_LoginGate response)
    {
        Scene root = session.Root();
        string account = root.GetComponent<GateSessionKeyComponent>().Get(request.Key);
        if (account == null)
        {
            response.Error = ErrorCore.ERR_ConnectGateKeyError;
            response.Message = "Gate key验证失败!";
            return;
        }
        
        session.RemoveComponent<SessionAcceptTimeoutComponent>();

        PlayerComponent playerComponent = root.GetComponent<PlayerComponent>();
        Player player = playerComponent.GetByAccount(account);
        if (player == null)
        {
            player = playerComponent.AddChild<Player, string>(account);
            playerComponent.Add(player);
            PlayerSessionComponent playerSessionComponent = player.AddComponent<PlayerSessionComponent>();
            playerSessionComponent.AddComponent<MailBoxComponent, MailBoxType>(MailBoxType.GateSession);
            await playerSessionComponent.AddLocation(LocationType.GateSession);

            player.AddComponent<MailBoxComponent, MailBoxType>(MailBoxType.UnOrderedMessage);
            await player.AddLocation(LocationType.Player);

            session.AddComponent<SessionPlayerComponent>().Player = player;
            playerSessionComponent.Session = session;
        }
        else
        {
            PlayerSessionComponent playerSessionComponent = player.GetComponent<PlayerSessionComponent>();
            playerSessionComponent.Session = session;
        }

        response.PlayerId = player.Id;
        await ETTask.CompletedTask;
    }
}

首先是验证一下请求的key是否合法,然后很关键,需要删除组件SessionAcceptTimeoutComponent

这个组件会在Session创建时默认被添加上去,作用就是在5秒后断开这个Session,对于大部分短连接,是OK的,但这里我们需要长期保持链接,所以我们要主动删除这个组件

然后添加一个PlayerComponent

[ChildOf(typeof(PlayerComponent))]
public sealed class Player : Entity, IAwake<string>
{
    public string Account { get; set; }
}
[ComponentOf(typeof(Scene))]
public class PlayerComponent : Entity, IAwake, IDestroy
{
    public Dictionary<string, EntityRef<Player>> dictionary = new Dictionary<string, EntityRef<Player>>();
}

获取Player也很简单,单纯就是从字典里去拿

如果存在创建好的Player,那就更简单了,我们把Gate服的Session保存到这个Player身上

如果不存在,那么我们首先创建一个对于的Player,接下来的几个步骤有点绕

  • 给Player添加PlayerSession组件,并挂载MailBox,这样这个组件就能够进行网络通讯了,然后把Session保存到这个组件上
  • 同样的,我们给Player也挂载MailBox
  • 反过来,又给Session添加了SessionPlayer组件,把Session和Player关联到一起

通过以上三个步骤,我们的Player,Session,PlayerSessionComponent,三者都被关联到了一起

最后返回Player的ID,不是账号,是EntityID

至此,整个登录流程就结束了