zoukankan      html  css  js  c++  java
  • 几种队列

     c#中自带的队列

     使用C#自带的队列,一般会把进行队列监听的代码放于Global.asax之类的文件或寄宿windows服务之中,与应用服务器在同一台,会抢占服务器资源。后面会介绍使用其他分布式队列。

    来一个简单的示例:

    队列帮助类

    namespace queue
    {
        public sealed class QueueHelper
        {
            private static QueueHelper queue;
            Queue<People> queueobj = new Queue<People>();
    
            public static QueueHelper instance
            {
                get
                {
                    if (queue==null)
                    {
                        queue = new QueueHelper();
                    }
                    return queue;
                }
            }
            /// <summary>
            /// 向队列中添加数据
            /// </summary>
            /// <param name="Id"></param>
            /// <param name="title"></param>
            /// <param name="content"></param>
            public void AddQueue(int id,int age, string name)
            {
                queueobj.Enqueue(new People()
                {
                    ID=id,
                    Age = age,
                    Name=name
                });
            }
            /// <summary>
            /// 当前队列数据量
            /// </summary>
            /// <returns></returns>
            public int Count()
            {
                return queueobj.Count();
            }
            //启动一个线程去监听队列
            public void StartQueue()
            {
                Task t = Task.Run(() => {
                    ScanQueue();
                });
            }
            private void ScanQueue()
            {
                while (true)
                {
                    if (queueobj.Count > 0)
                    {
                        try
                        {
                            //从队列中取出  
                            People queueinfo = queueobj.Dequeue();
                            Console.WriteLine($"取得队列:ID={queueinfo.ID},name={queueinfo.Name},age={queueinfo.Age}");
                        }
                        catch (Exception ex)
                        {
                            throw;
                        }
                    }
                    else
                    {
                        Console.WriteLine("没有数据,先休眠5秒在扫描");
                        Thread.Sleep(5000);
                    }               
                }          
            }
        }
    }

    Main方法

        static void Main(string[] args)
            {
                Console.WriteLine("当前队列数:"+ QueueHelper.instance.Count());
                Console.WriteLine("开启线程扫描");
                QueueHelper.instance.StartQueue();
                Thread.Sleep(10000);
                QueueHelper.instance.AddQueue(1,25,"小王");
                QueueHelper.instance.AddQueue(2, 28, "小明");
                QueueHelper.instance.AddQueue(3, 99, "大爷");
                Console.ReadLine();
            }

    运行结果

     redis队列

     对可靠性和稳定性要求不高的应用场景,可以使用redis简单方便的实现

    关于redis队列的实现方式有两种:

    1、生产者消费者模式(List)。

    2、发布者订阅者模式(pub/sub)。

    驱动:StackExchange.Redis   

    以下为第一种方式示例

    RedisHelper帮助类

    public class RedisHelper
        {
            // redis实例
            private static RedisHelper instance = null;
            private IDatabase db;
            private ConnectionMultiplexer redis;
            private IServer redisServer;
            private readonly string _enqueueName = "PersonObj";
            /// <summary>
            /// 静态单例方法
            /// </summary>
            /// <returns></returns>
            public static RedisHelper Get()
            {
                if (instance == null)
                {
                    instance = new RedisHelper();
                }
                return instance;
            }
            /// <summary>
            /// 无参数构造函数
            /// </summary>
            private RedisHelper()
            {
                var redisConnection = "127.0.0.1:6379";
                redis = ConnectionMultiplexer.Connect(redisConnection);
                redisServer = redis.GetServer(redisConnection);
                db = redis.GetDatabase();
            }
            /// <summary>
            /// 入队
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="enqueueName">队列名称</param>
            /// <param name="value"></param>
            public void EnqueueItem<T>( T value)
            {
                //序列化
                var valueString = JsonConvert.SerializeObject(value);
                db.ListLeftPushAsync(_enqueueName, valueString);
            }
            /// <summary>
            /// 出队
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="enqueueName">队列名称</param>
            /// <returns></returns>
            public T DequeueItem<T>()
            {
                var valueString = db.ListRightPopAsync(_enqueueName).Result;
           //反序列化 T obj
    = JsonConvert.DeserializeObject<T>(valueString); return obj; } /// <summary> /// 当前队列数据量 /// </summary> /// <param name="enqueueName"></param> /// <returns></returns> public long Count() { return db.ListLengthAsync(_enqueueName).Result; } //启动一个线程去监听队列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (this.Count() > 0) { try { //从队列中取出 Person queueinfo = DequeueItem<Person>(); Console.WriteLine($"取得队列:name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("没有数据,先休眠5秒在扫描"); Thread.Sleep(5000); } } } }

     Main方法:

          static void Main(string[] args)
            {
                Console.WriteLine($"当前队列数{RedisHelper.Get().Count()}");
                Console.WriteLine("开启线程扫描");
                RedisHelper.Get().StartQueue();
                Thread.Sleep(10000);
                RedisHelper.Get().EnqueueItem<Person>(new Person { Name="小王",Age=20});
                RedisHelper.Get().EnqueueItem<Person>(new Person { Name = "小明", Age = 20 });
                Console.WriteLine($"当前队列数{RedisHelper.Get().Count()}");
                Console.ReadLine();
            }

    运行结果

     

    RabbitMQ

     RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java等,且支持AJAX。用于在分布式系统中存储转发消息,具体特点包括易用性、扩展性、高可用性、消息集群等

    关于RabbitMQ的教程可以参考:https://www.cnblogs.com/julyluo/p/6262553.html

    安装教程可以参考:https://www.cnblogs.com/ericli-ericli/p/5902270.html

     RabbitMq默认的监听端口是15672

        public class RabbitmqHelper
        {
            private static RabbitmqHelper instance;
            private readonly ConnectionFactory rabbitMqFactory;
            //交换器名称
            const string ExchangeName = "myExchange";
            //当前消息队列名称
            const string QueueName = "myFirstQueue";
    
            public RabbitmqHelper()
            {
                //没有设置账号密码端口或服务端没有先开启权限会报:None of the specified endpoints were reachable
                rabbitMqFactory = new ConnectionFactory { HostName = "192.168.0.112" };
                rabbitMqFactory.Port = 5672;
                rabbitMqFactory.UserName = "123456";
                rabbitMqFactory.Password = "123456";
            }
            /// <summary>
            /// 静态单例方法
            /// </summary>
            /// <returns></returns>
            public static RabbitmqHelper Get()
            {
                if (instance == null)
                {              
                    instance = new RabbitmqHelper();
                }
                return instance;
            }
    
            /// <summary>
            /// 生产者-发布消息
            /// </summary>
            /// <param name="msg">消息</param>
            public void Enqueue(string msg)
            {
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //定义交换器
                        channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                        //持久化一个队列,如果名称相同不会重复创建
                        channel.QueueDeclare(QueueName, true, false, false, null);
                        //定义exchange到queue的binding
                        channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                        byte[] bytes = Encoding.UTF8.GetBytes(msg);
                        //设置消息持久化
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        channel.BasicPublish("", QueueName, properties, bytes);
                    }
                }
            }
            /// <summary>
            /// 消费者-接收消息-基于订阅模式
            /// </summary>
            /// <param name="msg">消息</param>
            public void Dequeue()
            {
                IConnection conn = rabbitMqFactory.CreateConnection();
                IModel channel = conn.CreateModel();
    
                //持久化一个队列,如果名称相同不会重复创建
                channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
    
                //告诉broker同一时间只处理一个消息
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                //因处理性能问题,已被官方弃用
                //var consumer2 = new QueueingBasicConsumer(channel)
                //var msgResponse = consumer.Queue.Dequeue();
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"收到消息:{msgBody}");
    
                        //处理完成,服务端可以删除消息了同时分配新的消息
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"出现错误:{e.Message}");
                    }
                };
    
                //noAck设置false,发送消息之后,消息不要主动删除,先等消费者处理完
                channel.BasicConsume(QueueName, false, consumer);
            }
            /// <summary>
            /// 消费者-接收消息-主动拉取
            /// </summary>
            /// <param name="msg">消息</param>
            public void Dequeue2()
            {
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        while (true)
                        {
                            BasicGetResult res = channel.BasicGet(QueueName, false/*noAck*/);
                            if (res != null)
                            {
                                var msg = System.Text.UTF8Encoding.UTF8.GetString(res.Body);
                                Console.WriteLine($"获取到消息:{msg}");
                                channel.BasicAck(res.DeliveryTag, false);
                            }
                        }
                    }
                }
    
    
            }
        }
    View Code
  • 相关阅读:
    泛型约束where条件的使用(可以通过类型参数动态反射创建实例)
    设计模式之java源码-工厂方法模式
    软件定义网络基础---OF-Config协议
    软件定义网络基础---OpenFlow协议
    软件定义网络基础---南向接口协议概述
    软件定义网络基础---OpenFlow流表
    软件定义网络基础---OpenFlow概述
    软件定义网络基础---SDN数据平面
    软件定义网络基础---SDN的核心思想
    MACVLAN虚拟网卡技术
  • 原文地址:https://www.cnblogs.com/qiuguochao/p/9113919.html
Copyright © 2011-2022 走看看