zoukankan      html  css  js  c++  java
  • RabbitMQ 消息框架&列信队列1

    RabbitMQ 消息队列框架

    引用包

    RabbitMQ.Client

    简单是在控制台操作,net里可以放在后台操作

    方式一:

    发送端:配置队列,并创建消费队列,配置多个消费队列轮流接收消息

            static void Main(string[] args)
            {
    
                var RAB_FACTORY = new ConnectionFactory();
                RAB_FACTORY.HostName = "127.0.0.1";
                RAB_FACTORY.Port = 5672;
                RAB_FACTORY.UserName = USER_NAME;
                RAB_FACTORY.Password = PASSWORD;
    
                using (var connection = RAB_FACTORY.CreateConnection())
                {
                    using (var delayChannel = connection.CreateModel())
                    {    //延时队列连接通道
                        var consumerChannel = connection.CreateModel();//消费队列连接通道
                        consumerChannel.ExchangeDeclare(EXCHANGENAME, "direct");//创建交换器
                        Dictionary<string, object> arg = new Dictionary<string, object>();
                        //配置死信交换器
                        arg.Add("x-dead-letter-exchange", EXCHANGENAME); //交换器名称
                                                                         //死信交换路由key (交换器可以将死信交换到很多个其他的消费队列,可以用不同的路由key 来将死信路由到不同的消费队列去)
                        arg.Add("x-dead-letter-routing-key", ROUTINGKEY);
                        delayChannel.QueueDeclare(DELAY_QUEUE_NAME, true, false, false, arg);
    
                        /**
                         直接在发送端创建接收的消费队列
                         */
                        consumerChannel.QueueDeclare(CONSUME_QUEUE_NAME, true, false, false, null);
                        //参数1:绑定的队列名  参数2:绑定至哪个交换器  参数3:绑定路由key
                        consumerChannel.QueueBind(CONSUME_QUEUE_NAME, EXCHANGENAME, ROUTINGKEY);
                        //最多接受条数 0为无限制,每次消费消息数(根据实际场景设置),true=作用于整channel,false=作用于具体的消费者
                        consumerChannel.BasicQos(0, 1, false);  //这里设置在多个消息者轮留接收消息
    
                        while (true)
                        {
                            var input = Console.ReadLine();
                            if (input == "exit")
                            {
                                break;
                            }
                            var body = Encoding.UTF8.GetBytes(input + "");
                            var properties = delayChannel.CreateBasicProperties();
                            properties.DeliveryMode = 2;
                            properties.Expiration = "1000"; //设置消息的过期时间
                            delayChannel.BasicPublish("", DELAY_QUEUE_NAME, properties, body);
                        }
                    }
                }
    
            }
    View Code

     接收端:

        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory();
                factory.HostName = "127.0.0.1";
                factory.UserName = "admin";
                factory.Password = "admin";
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //创建队列
                        channel.QueueDeclare("consume-queue-orderOverTime", true, false, false, null);
                        //绑定消费者到哪个队列
                        var consumer = new EventingBasicConsumer(channel);
                        channel.BasicConsume("consume-queue-orderOverTime", false, consumer);
                        consumer.Received += (model, ea) =>
                        {
                            //Thread.Sleep(1000 * 2);
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine("已接收: {0}", message);
                            //设置手动确认
                            channel.BasicAck(ea.DeliveryTag, false);
                        };
                        //手动确认OK
                        consumer.HandleBasicConsumeOk("ok");
                        Console.ReadLine();
                    }
                }
            }
        }
    View Code

    方式二:

    发送端:

        class Program
        {
            static void Main(string[] args)
            {
                //发送者
                Console.WriteLine("Start");
                IConnectionFactory conFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = "127.0.0.1",//IP地址
                    Port = 5672,//端口号
                    UserName = "admin",//用户账号
                    Password = "admin"//用户密码
                };
                using (IConnection con = conFactory.CreateConnection())//创建连接对象
                {
                    using (IModel channel = con.CreateModel())//创建连接会话对象
                    {
                        String queueName = String.Empty;
                        if (args.Length > 0)
                            queueName = args[0];
                        else
                            queueName = "queue1";
                        //声明一个队列
                        channel.QueueDeclare(
                          queue: queueName,//消息队列名称
                          durable: false,//是否缓存
                          exclusive: false,
                          autoDelete: false,
                          arguments: null
                           );
                        while (true)
                        {
                            Console.WriteLine("消息内容:");
                            String message = Console.ReadLine();
                            //消息内容
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            //发送消息
                            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                            Console.WriteLine("成功发送消息:" + message);
                        }
                    }
                }
            }
        }
    View Code

    接收端:

        class Program
        {
            static void Main(string[] args)
            {
                //接收者
                Console.WriteLine("Start");
                IConnectionFactory connFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = "127.0.0.1",//IP地址
                    Port = 5672,//端口号
                    UserName = "admin",//用户账号
                    Password = "admin"//用户密码
                };
                using (IConnection conn = connFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                        String queueName = String.Empty;
                        if (args.Length > 0)
                            queueName = args[0];
                        else
                            queueName = "queue1";
                        //声明一个队列
                        channel.QueueDeclare(
                          queue: queueName,//消息队列名称
                          durable: false,//是否缓存
                          exclusive: false,
                          autoDelete: false,
                          arguments: null
                           );
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            Thread.Sleep(1000 * 5);
    
                            byte[] message = ea.Body;//接收到的消息
    
                            channel.BasicAck(ea.DeliveryTag, false);
                            Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message));
                        };
                        //消费者开启监听
                        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
        }
    View Code
  • 相关阅读:
    路径问题
    移动端推荐使用
    js获取各种宽高方法
    html 符号大全
    bzoj4923 K小值查询
    bzoj3781 小B的询问
    bzoj1799 [Ahoi2009]self 同类分布
    bzoj2005 [Noi2010]能量采集
    bzoj4039 集会
    bzoj2516 电梯
  • 原文地址:https://www.cnblogs.com/myfqm/p/13030401.html
Copyright © 2011-2022 走看看