zoukankan      html  css  js  c++  java
  • c#开源消息队列中间件EQueue 教程

    一、简介

    EQueue是一个参照RocketMQ实现的开源消息队列中间件,兼容Mono,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

    二、安装EQueue

    Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

    image

    using System;
    using System.Text;
    using ECommon.Autofac;
    using ECommon.Configurations;
    using ECommon.JsonNet;
    using ECommon.Log4Net;
    using EQueue.Broker;
    using EQueue.Configurations;
    using EQueue.Protocols;
    
    namespace QuickStart.BrokerServer
    {
        class Program
        {
            static void Main(string[] args)
            {
                InitializeEQueue();
                var setting = new BrokerSetting();
                setting.NotifyWhenMessageArrived = false;
                setting.DeleteMessageInterval = 1000;
                new BrokerController(setting).Initialize().Start();
                Console.ReadLine();
            }
    
            static void InitializeEQueue()
            {
                Configuration
                    .Create()
                    .UseAutofac()
                    .RegisterCommonComponents()
                    .UseLog4Net()
                    .UseJsonNet()
                    .RegisterEQueueComponents();
            }
        }
    }

    InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

       public static class ConfigurationExtensions
        {
            public static Configuration RegisterEQueueComponents(this Configuration configuration)
            {
                configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
                configuration.SetDefault<IQueueSelector, QueueHashSelector>();
                configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
                configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
                configuration.SetDefault<IMessageService, MessageService>();
                configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
                return configuration;
            }
        }

    代码中涉及到6个组件:

    • IAllocateMessageQueueStrategy
    • IQueueSelector
    • ILocalOffsetStore
    • IMessageStore
    • IMessageService
    • IOffsetManager

    DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

     public class BrokerSetting
        {
            public SocketSetting ProducerSocketSetting { get; set; }
            public SocketSetting ConsumerSocketSetting { get; set; }
            public bool NotifyWhenMessageArrived { get; set; }
            public int DeleteMessageInterval { get; set; }
    
            public BrokerSetting()
            {
                ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
                ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
                NotifyWhenMessageArrived = true;
                DeleteMessageInterval = 1000 * 60 * 60;
            }
        }

    运行项目,如果显示下面类似内容,说明Broker启动成功:

    2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

    三、在Visual Studio中开发测试

    1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

     using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using ECommon.Autofac;
    using ECommon.Configurations;
    using ECommon.IoC;
    using ECommon.JsonNet;
    using ECommon.Log4Net;
    using ECommon.Scheduling;
    using EQueue.Clients.Producers;
    using EQueue.Configurations;
    using EQueue.Protocols;
    
    namespace QuickStart.ProducerClient
    {
        class Program
        {
            static void Main(string[] args)
            {
                InitializeEQueue();
    
                var scheduleService = ObjectContainer.Resolve<IScheduleService>();
                var producer = new Producer().Start();
                var total = 1000;
                var parallelCount = 10;
                var finished = 0;
                var messageIndex = 0;
                var watch = Stopwatch.StartNew();
    
                var action = new Action(() =>
                {
                    for (var index = 1; index <= total; index++)
                    {
                        var message = "message" + Interlocked.Increment(ref messageIndex);
                        producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                        {
                            var finishedCount = Interlocked.Increment(ref finished);
                            if (finishedCount % 1000 == 0)
                            {
                                Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                            }
                        });
                    }
                });
    
                var actions = new List<Action>();
                for (var index = 0; index < parallelCount; index++)
                {
                    actions.Add(action);
                }
    
                Parallel.Invoke(actions.ToArray());
    
                Console.ReadLine();
            }
    
            static void InitializeEQueue()
            {
                Configuration
                    .Create()
                    .UseAutofac()
                    .RegisterCommonComponents()
                    .UseLog4Net()
                    .UseJsonNet()
                    .RegisterEQueueComponents();
            }
        }
    }

    Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

     public class ProducerSetting
        {
            public string BrokerAddress { get; set; }
            public int BrokerPort { get; set; }
            public int SendMessageTimeoutMilliseconds { get; set; }
            public int UpdateTopicQueueCountInterval { get; set; }
    
            public ProducerSetting()
            {
                BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
                BrokerPort = 5000;
                SendMessageTimeoutMilliseconds = 1000 * 10;
                UpdateTopicQueueCountInterval = 1000 * 5;
            }

    2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

    using System;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using ECommon.Autofac;
    using ECommon.Configurations;
    using ECommon.IoC;
    using ECommon.JsonNet;
    using ECommon.Log4Net;
    using ECommon.Scheduling;
    using EQueue.Broker;
    using EQueue.Clients.Consumers;
    using EQueue.Configurations;
    using EQueue.Protocols;
    
    namespace QuickStart.ConsumerClient
    {
        class Program
        {
            static void Main(string[] args)
            {
                InitializeEQueue();
    
                var messageHandler = new MessageHandler();
                var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
                var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
                var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
                var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);
    
                Console.WriteLine("Start consumer load balance, please wait for a moment.");
                var scheduleService = ObjectContainer.Resolve<IScheduleService>();
                var waitHandle = new ManualResetEvent(false);
                var taskId = scheduleService.ScheduleTask(() =>
                {
                    var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                    var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                    var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                    var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                    if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                    {
                        Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                            string.Join(",", c1AllocatedQueueIds),
                            string.Join(",", c2AllocatedQueueIds),
                            string.Join(",", c3AllocatedQueueIds),
                            string.Join(",", c4AllocatedQueueIds)));
                        waitHandle.Set();
                    }
                }, 1000, 1000);
    
                waitHandle.WaitOne();
                scheduleService.ShutdownTask(taskId);
    
                Console.ReadLine();
            }
    
            static void InitializeEQueue()
            {
                Configuration
                    .Create()
                    .UseAutofac()
                    .RegisterCommonComponents()
                    .UseLog4Net()
                    .UseJsonNet()
                    .RegisterEQueueComponents();
            }
        }
    
        class MessageHandler : IMessageHandler
        {
            private int _handledCount;
    
            public void Handle(QueueMessage message, IMessageContext context)
            {
                var count = Interlocked.Increment(ref _handledCount);
                if (count % 1000 == 0)
                {
                    Console.WriteLine("Total handled {0} messages.", count);
                }
                context.OnMessageHandled(message);
            }
        }
    }

    使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

        public class ConsumerSetting
        {
            public string BrokerAddress { get; set; }
            public int BrokerPort { get; set; }
            public int RebalanceInterval { get; set; }
            public int UpdateTopicQueueCountInterval { get; set; }
            public int HeartbeatBrokerInterval { get; set; }
            public int PersistConsumerOffsetInterval { get; set; }
            public PullRequestSetting PullRequestSetting { get; set; }
            public MessageModel MessageModel { get; set; }
            public MessageHandleMode MessageHandleMode { get; set; }
    
            public ConsumerSetting()
            {
                BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
                BrokerPort = 5001;
                RebalanceInterval = 1000 * 5;
                HeartbeatBrokerInterval = 1000 * 5;
                UpdateTopicQueueCountInterval = 1000 * 5;
                PersistConsumerOffsetInterval = 1000 * 5;
                PullRequestSetting = new PullRequestSetting();
                MessageModel = MessageModel.Clustering;
                MessageHandleMode = MessageHandleMode.Parallel;
            }

    EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 环境下的运行结果:

    image

    Kafka/Metaq设计思想学习笔记

    EQueue - 详细谈一下消息持久化以及消息堆积的设计

  • 相关阅读:
    java.sql.SQLException: 数据大小超出此类型的最大值
    日志收集系统 ELK
    centos下mysql 数据库安装、调试
    Log4j应用
    使用webuploader实现大文件断点续传(前端部分)
    es6学习 -- 解构赋值
    es6学习 -- let和const
    关于禁止页面滚动的实践(禁止滚轮事件)
    匿名函数与闭包
    JS高级学习总结--面向对象
  • 原文地址:https://www.cnblogs.com/shanyou/p/3619545.html
Copyright © 2011-2022 走看看