zoukankan      html  css  js  c++  java
  • RabbitMq 之客户端(publish,subscrbe)

     public static void InitProducerMQ()
            {
                
    // Uri uri = new Uri("amqp://192.168.1.100:5672/");
                Uri uri = new Uri("amqp://localhost:5672/");
                
    string exchange = "ex1";
                
    string exchangeType = "direct";
                
    string routingKey = "m1";
                
    bool persistMode = true;
                ConnectionFactory cf 
    = new ConnectionFactory();

                cf.UserName 
    = "lee";
                cf.Password 
    = "123456";
                cf.VirtualHost 
    = "/";
                cf.RequestedHeartbeat 
    = 0;
                cf.Endpoint 
    = new AmqpTcpEndpoint(uri);
                
    using (IConnection conn = cf.CreateConnection())
                {
                    
    using (IModel ch = conn.CreateModel())
                    {
                        
    if (exchangeType != null)
                        {
                            ch.ExchangeDeclare(exchange, exchangeType);
    //,true,true,false,false, true,null);
                            
    //ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
                            
    //ch.QueueBind("q1", "ex1", "m1", false, null);
                            ch.QueueDeclare("q1"falsefalsetruenull);
                            
    //string queue, string exchange, string routingKey, IDictionary arguments
                            ch.QueueBind("q1""ex1""m1",null);


                            
    // durable: 是否持久,如果是true,则在服务重启之后还是存在,否则不存在
                            
    //exclusive: 仅创建者可以使用的私有队列,断开后自动删除。(如果为true则只有创建者线程才可以使用,如果线程停止则queue也会自动删除) 
                            
    //auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
                            int i = 0;

                           
                            
    while (true)
                            {

                                IMapMessageBuilder b 
    = new MapMessageBuilder(ch);
                                IDictionary target 
    = b.Headers;
                                target[
    "header"= "hello world";
                                IDictionary targetBody 
    = b.Body;
                                targetBody[
    "body"= "lee"+i++.ToString();
                                
    if (persistMode)
                                {
                                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
    = 2;
                                }

                                ch.BasicPublish(exchange, routingKey,
                                                           (IBasicProperties)b.GetContentHeader(),
                                                           b.GetContentBody());

                            }
                         }
                    }
                }
            }

    ___________________________________________  客户端publish代码

     public static void InitCustmerMq()
            {
                
    string exchange = "ex1";
                
    string exchangeType = "direct";
                
    string routingKey = "m1";

               
    // string serverAddress = "amqp://192.168.1.100:5672";
                Uri uri = new Uri("amqp://localhost:5672/");
                ConnectionFactory cf 
    = new ConnectionFactory();
                cf.Endpoint 
    = new AmqpTcpEndpoint(uri); 
                cf.UserName 
    = "lee";
                cf.Password 
    = "123456";
                cf.VirtualHost 
    = "/";
                cf.RequestedHeartbeat 
    = 0;
                
                
    using (IConnection conn = cf.CreateConnection())
                {
                    
    using (IModel ch = conn.CreateModel())
                    {
                        
    //普通使用方式BasicGet
                        
    //noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                        
    //noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
                       
    // BasicGetResult res = ch.BasicGet("q1", false/*noAck*/);
                        
    //if (res != null)
                        
    //{
                        
    //    bool t = res.Redelivered;
                        
    //    t = true;
                        
    //    Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        
    //    ch.BasicAck(res.DeliveryTag, false);
                        
    //}
                        
    //else
                        
    //{
                        
    //    Console.WriteLine("No message!");
                        
    //}
                        
    //return 0;
                        while (true)
                        {
                            BasicGetResult res 
    = ch.BasicGet("q1"false/*noAck*/);
                            
    if (res != null)
                            {
                                
    try
                                {
                                    
    bool t = res.Redelivered;
                                    t 
    = true;
                                    Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                                    ch.BasicAck(res.DeliveryTag, 
    false);
                                }
                                
    catch { }
                            }
                            
    //else
                            
    //    break;
                        }
                    }
                }
            }

    通常的操作流程是:
    (1) 消费者: 创建信息通道。
    (2) 消费者: 定义消息队列。
    (3) 消费者: 定义特定类型的交换机。
    (4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)。
    (5) 消费者: 等待消息。
    (6) 生产者: 创建消息。
    (7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)。
    (8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)。
    (9) 消费者: 获取并处理消息,发送反馈。
    (10) 结束: 关闭通道和连接

    ——————————————————————————客户端消费者代码

    队列定义参数:
    durable: 是否持久,如果是true,则在服务重启之后还是存在,否则不存在
    exclusive: 仅创建者可以使用的私有队列,断开后自动删除。(如果为true则只有创建者线程才可以使用,如果线程停止则queue也会自动删除)
    auto_delete: 当所有消费客户端连接断开后,是否自动删除队列。

    交换机定义参数:
    type: 交换机类型,包括 fanout, direct 和 topic。
    auto_delete: 当所有绑定队列都不再使用时,是否自动删除该交换机。

    交换机类型:
    Fanout: 不处理路由键,将消息广播给绑定到该交换机的所有队列。 不论消息的路由关键字是什么,这条消息都会被路由到所有与该交换器绑定的队列中。
    广播式交换器类型的工作方式如下:
    不使用任何参数将消息队列与交换器绑定在一起。 发布者(直接式交换器类型描述中的producer变成了publisher,已经隐含了二种交换器类型的区别)向交换器发送一条消息。 消息被无条件的传递到所有和这个交换器绑定的消息队列中。
     
    Direct: 处理路由键,对消息路径进行全文匹配。消息路由键 "dog" 只能匹配 "dog" 绑定,不匹配 "dog.puppy" 这类绑定。
    通过精确匹配消息的路由关键字,将消息路由到零个或者多个队列中,绑定关键字用来将队列和交换器绑定到一起。这让我们可以构建经典的点对点队列消息传输模型,不过和任何已定义的交换器类型一样,当消息的路由关键字与多个绑定关键字匹配时,消息可能会被发送到多个队列中。

    Topic: 处理路由键,按模式匹配路由键。模式符号 "#" 表示一个或多个单词,"*" 仅匹配一个单词。如 "audit.#" 可匹配 "audit.irs.corporate",但 "audit.*" 只匹配 "audit.irs"。
    主题式交换器类型提供了这样的路由机制:通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中。这种路由器类型可以被用来支持经典的发布/订阅消息传输模型——使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者。
    主题交换器类型的工作方式如下:
    绑定关键字用零个或多个标记构成,每一个标记之间用“.”字符分隔。绑定关键字必须用这种形式明确说明,并支持通配符:“*”匹配一个词组,“#”零个或多个词组。
    因此绑定关键字“*.stock.#”匹配路由关键字“usd.stock”和“eur.stock.db”,但是不匹配“stock.nasdaq”。
    这种交换器类型是可选的。

    生产者无需定义队列、交换机和绑定,只需将消息投递给信息通道即可。

    如果定义一durable的queue,那么bind此queue的exchange也必须是durable的


    定义用户权限
    set_permissions [-p vhostpath] [-s scope] {user} {conf} {write} {read}

    vhostpath
    The name of the virtual host to which to grant the user access, defaulting to /.

    scope
    Scope of the permissions: either client (the default) or all. This determines whether permissions are checked for server-generated resource names (all) or only for client-specified resource names (client).

    user
    The name of the user to grant access to the specified virtual host.

    conf
    A regular expression matching resource names for which the user is granted configure permissions.

    write
    A regular expression matching resource names for which the user is granted write permissions.

    read
    A regular expression matching resource names for which the user is granted read permissions.

    Sets user permissions.

    参考链接:http://blog.csdn.net/ykdsg/archive/2010/11/16/6013448.aspx

  • 相关阅读:
    整型数字转utf8
    cmake构建时指定编译器架构(x86 or x64)
    tcp echo server libuv
    VS2015编译boost1.62
    android rom开发
    游戏昵称
    乐观锁和悲观锁
    数据库锁机制
    MySQL事务实现原理
    MySQL事务
  • 原文地址:https://www.cnblogs.com/chenli0513/p/2020526.html
Copyright © 2011-2022 走看看