zoukankan      html  css  js  c++  java
  • RabbitMQ 笔记

    1:发送

    Uri uri = new Uri("amqp://10.0.4.85:5672/");

    ConnectionFactory factory = new ConnectionFactory();
    factory.UserName = "abc";
    factory.Password = "abcdef";
    factory.VirtualHost = "dnt_mq";
    factory.RequestedHeartbeat = 0;
    factory.Endpoint = new AmqpTcpEndpoint(uri);

    string exchange = "ex1";
    string exchangeType = "direct";
    string routingKey = "Q1";

    using (IConnection conn = factory.CreateConnection())
    {
    using (IModel ch = conn.CreateModel())
    {
    if (exchangeType != null)
    {
    ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
    ch.QueueDeclare("MyFirstQueue", true, true, true, null);
    ch.QueueBind("MyFirstQueue", exchange, routingKey);
    }
    string jsonStr = "abc";//JsonConvert.SerializeObject(requestMsg);
    byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);

    IBasicProperties properties = ch.CreateBasicProperties();
    properties.DeliveryMode = 2;

    ch.BasicPublish(exchange, routingKey, properties, bytes);

    /*
    * channel.queueBind("queueName", "exchangeName", "routingKey.*");
    * channel.basicPublish("exchangeName", "routingKey.one", properties, bytes);
    *
    * channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic
    * channel.basicPublish("exchangeName", "", properties, bytes);
    * */
    }
    }

    2 接收
    try
    {
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "";
    factory.Port = 456;
    factory.UserName = "";
    factory.Password = "";
    using (IConnection conn = factory.CreateConnection())
    {
    using (IModel channel = conn.CreateModel())
    {
    //在MQ上定义一个持久化队列,如果名称相同不会重复创建
    channel.QueueDeclare("MyFirstQueue", true, false, false, null);

    //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
    channel.BasicQos(0, 1, false);

    Console.WriteLine("Listening...");

    //在队列上定义一个消费者
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    //消费队列,并设置应答模式为程序主动应答
    channel.BasicConsume("MyFirstQueue", false, consumer);

    while (true)
    {
    //阻塞函数,获取队列中的消息
    BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    byte[] bytes = ea.Body;
    string str = Encoding.UTF8.GetString(bytes);
    //RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
    Console.WriteLine("HandleMsg:" + "".ToString());
    //回复确认
    channel.BasicAck(ea.DeliveryTag, false);
    }
    }
    }
    }
    catch (Exception e1)
    {
    Console.WriteLine(e1.ToString());
    }
    Console.ReadLine();

    }

  • 相关阅读:
    angular2^ typescript 将 文件和Json数据 合并发送到服务器(1.客户端处理)
    错误的尝试:回射程序改进2
    XML Schema笔记
    回射程序改进1
    DTD笔记
    XML语法笔记
    判断IPv6地址合法性
    线程相关函数(POSIX线程):
    使用string实现一个用于储存那些太大而无法使用 long long 的数
    基本SCTP套接字编程常用函数
  • 原文地址:https://www.cnblogs.com/yufan27209/p/4208304.html
Copyright © 2011-2022 走看看