zoukankan      html  css  js  c++  java
  • rabbitmq系列——(1生产者消费者点对点)

    引用

    1. 生产者

    using RabbitMQMsgProducer.MessageProducer;
    using System;
    
    namespace RabbitMQMsgProducer
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 生产者消费者,点对点
                        ProducerMsg.Send();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using System;
    using System.Text;
    using System.Threading; namespace RabbitMQMsgProducer.MessageProducer { public class ProducerMsg { public static void Send() { Console.ForegroundColor = ConsoleColor.Yellow; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.HostName = "localhost"; // 服务地址 connectionFactory.UserName = "guest"; // 用户名 connectionFactory.Password = "guest"; // 密码 string queueName = "ProducerMsgQueue"; string exchangeName = "ProducerMsgExchange"; using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //队列声明 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); //路由声明 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //队列绑定 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: string.Empty, arguments: null); Console.WriteLine("ProducerMsg 准备就绪,可以发消息了。"); while (true) { IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; // 持久化 string msg = $"消息内容{DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss fff")}"; byte[] body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: exchangeName, routingKey: string.Empty, basicProperties: basicProperties, body: body); Console.WriteLine($"the message : {msg} over."); Thread.Sleep(1000); } } } } } }

    2. 消费者

    using RabbitMQMsgConsumer001.MessageConsumer;
    using System;
    
    namespace RabbitMQMsgConsumer001
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    {
                        // 生产者消费者,点对点
                        ConsumerMsg.Receive();
                    }
                    Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    namespace RabbitMQMsgConsumer001.MessageConsumer
    {
        public class ConsumerMsg
        {
            public static void Receive()
            {
                Console.ForegroundColor = ConsoleColor.Blue;
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.HostName = "localhost"; // 服务地址
                connectionFactory.UserName = "guest"; // 用户名
                connectionFactory.Password = "guest"; // 密码
                string queueName = "ProducerMsgQueue";
                string exchangeName = "ProducerMsgExchange";
    
                using (var connection = connectionFactory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        try
                        {
                            // 声明和绑定,防止其不存在
                            //队列声明
                            channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                            //路由声明
                            channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                            //队列绑定
                            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: string.Empty, arguments: null);
    
                            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                            consumer.Received += (model, ea) =>
                            {
                                var body = ea.Body;
                                var msg = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"the consumer 001 received : {msg} over.");
                            };
                            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                            Console.WriteLine("ConsumerMsg 准备就绪,可以接收消息了。");
                            Console.ReadLine();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                        }
                    }
                }
    
            }
        }
    }

    3. 效果

     

  • 相关阅读:
    Docker-compose编排微服务顺序启动解决方案
    在笔记本上使用virtualbox搭建lvs dr 实验遇到的问题
    MongoDB安装配置(RedHat/CentOS)
    wget 用法
    Ubuntu查看crontab运行日志
    CentOS-6.5安装Zabbix 3.0.4
    centos 6 安装vsftpd与PAM虚拟用户
    python socket常用接口说明
    cmake 构建工程
    std::vector的下标访问和迭代器访问的效率
  • 原文地址:https://www.cnblogs.com/Fletcher/p/14138426.html
Copyright © 2011-2022 走看看