zoukankan      html  css  js  c++  java
  • rabbitmq基本使用

    生产者:

    public class RabbitMqHelper {
            public static void BasicPublish<T> (T t) where T : class {
                var factory = new ConnectionFactory () { HostName = "192.168.1.7" }; //, Port = 5672 };
                using (var connection = factory.CreateConnection ())
                using (var channel = connection.CreateModel ()) {
                    channel.QueueDeclare (queue: "hello",
                        durable : true,//声明持久化queue
                        exclusive : false,
                        autoDelete : false,
                        arguments : null);
    
                    var body = t.ToBytes ();
    
                    var properties = channel.CreateBasicProperties ();
                    properties.Persistent = true;//使用持久化消息
    
                    channel.BasicPublish (exchange: "",
                        routingKey: "hello",
                        basicProperties : properties,
                        body : body);
                    Console.WriteLine (" [x] Sent {0}", t.ToJsonString ());
                }
            }
        }

    消费者:

    class Program {
            private static object lockObj = new object ();
    
            private static Random random = new Random ();
    
            static void Main (string[] args) {
                Console.Title = "Consumer";
                var factory = new ConnectionFactory () { HostName = "192.168.1.7" };
                using (var connection = factory.CreateConnection ())
                using (var channel = connection.CreateModel ()) {
                    channel.QueueDeclare (queue: "hello",
                        durable : true,
                        exclusive : false,
                        autoDelete : false,
                        arguments : null);
    
              //设置channel给每个consumer只推一个消息, 在没有接收到consumer的ack之前, 不再给这个消费者推消息 channel.BasicQos (prefetchSize:
    0, prefetchCount: 1, global: false); Console.WriteLine (" [*] Waiting for messages."); var consumer = new EventingBasicConsumer (channel); consumer.Received += (sender, e) => { try { Console.WriteLine ($"Handle msg {e.ConsumerTag}"); Thread.Sleep (random.Next (100)); var body = e.Body; var message = Encoding.UTF8.GetString (body); lock (lockObj) { File.AppendAllText ("./test.json", $"{message}{Environment.NewLine}"); } Console.WriteLine ($" [{message}] handling finished!");

                   //消息处理完后, 发送ack给channel, 以便channel给它发下一个消息来处理 channel.BasicAck (deliveryTag: e.DeliveryTag, multiple:
    false); } catch (Exception ex) { System.Console.WriteLine (ex.Message);
                  //消息处理失败的话, 发送nack给channel, 以但重新入队重新处理 channel.BasicNack (deliveryTag: e.DeliveryTag, multiple:
    false, requeue: true); } }; channel.BasicConsume (queue: "hello", autoAck : false,//不自动ack, 如果自动ack的话, rabbitmq会往死里把消息往外推, 不管消费者是否处理得了, 消息刚刚到mq里, 马上就发往消费者了, 岂不是事实上的没有起到mq的作用了 consumer : consumer); Console.WriteLine (" Press [enter] to exit."); Console.ReadLine (); } } }
  • 相关阅读:
    加载web项目时报的错误:Tomcat version 6.0 only supports J2EE 1.2, 1.3, 1.4, and Java EE 5 Web modul
    js修改title
    14.Android UiAutomator 图像处理
    13.UiAutomator 辅助APK的使用
    12.UiAutomator 获取系统信息
    11.UiAutomator 相关JAVA知识
    10.Android UiAutomator Junit 断言函数的使用
    面向对象基本关键词的解释
    Java图形界面——Border
    java文本编辑器v2.0 图形用户界面
  • 原文地址:https://www.cnblogs.com/lihan829/p/11141242.html
Copyright © 2011-2022 走看看