zoukankan      html  css  js  c++  java
  • NetMQ:.NET轻量级消息队列

    前言

    首先我现在是在一家游戏工作做服务端的,这几天我们服务端游戏做了整个底层框架的替换,想必做过游戏的也都知道,在游戏里面会有很多的日志需要记录,量也是比较大的;在没有换框架之前我们存日志和游戏运行都是在一套框架里面的,所以做起来比较冗余,也会给游戏服务器带来比较大的压力;现在在这套框架就是把存日志的单独分开做了起来;最后我们老大就选择了NetMQ, 之前没有接触过NetMq 但是总体也是完成了这个日志服务器的编写,所以在这里也是分享了一下;

    什么是NetMQ

    简单说就是ZeroMQ的.net开源版本 吊炸天的速度你懂的

    下载NetMQ

    Install-Package NetMQ

    Demo

    1) 客户端

    using Games.BaseModel.LogEntities;
    using NetMQ;
    using System;
    using System.IO;
    using System.Runtime.Serialization.Formatters.Binary;
    
    namespace NetMQTest
    {
        class Program
        {
            static void Main(string[] args)
            {
                Run();
                Console.ReadLine();
            }
    
            public static void Run()
            {
                using (var ctx = NetMQContext.Create())
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Connect("tcp://192.168.1.138:5556");
                    for (int i = 0; i < 10000; i++)
                    {
                        using (var sm = new MemoryStream())
                        {
                            var entityLog = new LoginLog { IP = "192.168.1.1", RoleName = "测试", Acc = "test", FromPlat = "1" };
                            var binaryFormatter = new BinaryFormatter();
                            binaryFormatter.Serialize(sm, entityLog);
                            var content = sm.ToArray();
                            sender.Send(content);
                        }
                        Console.WriteLine(i);
                    }
                }
            }
        }
    }

    2)服务端(这里面是我真是游戏里面所用到的,基本意思和原理是一样的,对于不同的场景需要自己进行修改)

    using Games.BaseModel.LogEntities;
    using Games.DBHandler;
    using Games.DBHandler.Dapper;
    using Games.Model.ConfigEntities;
    using NetMQ;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Threading.Tasks;
    
    namespace NetMQ_Code
    {
        public class Program
        {
            private static ServerConfigNew _serverConfig;
    
            private static IDataAccessor _dataAccessor;
    
            private static string _connection;
    
            private static readonly Dictionary<string, IDataAccessor> DicServerConfigNew = new Dictionary<string, IDataAccessor>();
    
            private static void Main(string[] args)
            {
                Console.WriteLine("NetMQ准备启动 . . . ");
                _connection = ConfigurationManager.ConnectionStrings["Config"].ConnectionString;
                Task task = new Task(Function);
                task.Start();
                Console.WriteLine("启动成功!");
                Console.ReadKey();
    
            }
    
            private static void Function()
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    //接收消息的套接字
                    using (var receiver = ctx.CreatePullSocket())
                    {
                        receiver.Bind("tcp://*:5556");
                        //处理任务
                        while (true)
                        {
                            var receivedBytes = receiver.Receive();
                            using (var sm = new MemoryStream(receivedBytes, 0, receivedBytes.Length))
                            {
                                //采用二进制方式
                                var binaryFormatter = new BinaryFormatter();
                                sm.Position = 0;
                                var entity = binaryFormatter.Deserialize(sm) as LogEntity;
                                var serverId = entity.ServerID;
                                if (DicServerConfigNew.ContainsKey(serverId))
                                {
                                    EntitySchemaSet.InitSchema(entity.GetType());
                                    DicServerConfigNew.TryGetValue(serverId, out _dataAccessor);
                                    _dataAccessor.SaveLog(entity);
                                }
                                else
                                {
                                    _serverConfig = DapplerUtil.Instance.Query<ServerConfigNew>(_connection, "SELECT * FROM ServerConfig where Id=" + serverId).FirstOrDefault();
                                    if (_serverConfig != null)
                                    {
                                        EntitySchemaSet.InitSchema(entity.GetType());
                                        using (_dataAccessor = DataAccessorFactory.Instance.CreateAccessor("Game", _serverConfig.GameDBConnection, DbProviderType.MsSql))
                                        {
                                            DicServerConfigNew.Add(serverId, _dataAccessor);
                                            _dataAccessor.SaveLog(entity);
                                        }
                                    }
                                    else
                                    {
                                        Console.WriteLine("serverConfig is null");
                                    }
                                }
                                Console.WriteLine("Insert:" + entity.GetType());
                            }
                        }
                    }
                }
            }
        }
    }

    运行结果

    总结

    NetMQ目前还不支持持久化消息,所以,可靠性不是特别高.消息队列基本的特性都满足,效率也很高.

  • 相关阅读:
    python基础---内置类型
    python基础---内置常量
    python基础---内置函数
    数据增强---CutMix
    数据增强---Mixup
    机器学习笔记---评估方法
    深度学习笔记-上溢和下溢
    线性代数笔记-子空间
    tensorflow2学习笔记---模块、层和模型
    tensorflow2学习笔记---Graph和tf.function
  • 原文地址:https://www.cnblogs.com/tcdave/p/9681100.html
Copyright © 2011-2022 走看看