zoukankan      html  css  js  c++  java
  • ActiveMQ基础教程(三):C#连接使用ActiveMQ消息队列

      接上一篇:ActiveMQ基础教程(二):安装与配置(单机与集群)

      安装部署好集群环境:192.168.209.133:61616,192.168.209.134:61616,192.168.209.135:61616

      因为ActiveMQ的集群模式是一种master-slave模式,master节点对外提供服务,slave节点只做数据同步备份,当master节点挂了,slave就会成为master从而继续对外提供服务,以此实现高可用。

      下面介绍C#连接使用ActiveMQ

      前言

      C#连接ActiveMQ一般使用Apache.NMS.ActiveMQ工具包,这个工具包可以在Nuget上获取:Apache.NMS.ActiveMQ

      

      吐槽:当前Apache.NMS.ActiveMQ最新版本是1.8.0,更新时间是2021-1-4,算是刚更新不久,但是上一个版本1.7.2的更新时间是2016-12-15,五六年不更新,到底是放弃治疗了还是ActiveMQ太优秀了呢?

      当然Apache.NMS.ActiveMQ也是开源的,不过它依赖于NMS规范,所以想看源码的朋友最好是看这两个开源项目:

      Apache.NMS.ActiveMQ(1.8.0):https://archive.apache.org/dist/activemq/apache-nms-activemq/1.8.0/

      Apache.NMS(1.8.0):https://archive.apache.org/dist/activemq/apache-nms-api/1.8.0/  

      Apache.NMS.ActiveMQ一般的使用步骤是:

      1、通过连接信息(连接字符串)来创建创建NMSConnectionFacotry或者特定环境下的ConnectionFacotry

      2、使用ConnectionFacotry创建Connection,并启动Connection(类似于ADO.net中的DbConnection)

      3、使用Connection开启一个Session,开启Session时可以指定一个默认的通道Destination(对于ActiveMQ而言,Destination可以认为是Queue或者Topic)

      4、如果是发送消息,使用Session创建Producer,接着使用Session创建Message,也可以自行实例化创建,再使用Producer发送Message,发送时可以指定通道Destination

      5、如果是消费消息,使用Session在指定的通道Destination创建Consumer(可创建持久化的Consumer),消费可以是阻塞消费(同步消费),也可以是异步消费

      6、使用完毕之后,关闭释放Connection,Session等资源(如果是异步消费,则不需要释放)

      连接(IConnection)与会话(ISeesion)

      其实NMS是ActiveMQ官方制定的.net连接使用ActiveMQ的规范,java也有个JMS。

      接下来说的就是NMS在ActiveMQ方面的使用了。

      创建连接会话的个一个简单的例子:  

        string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
        IConnectionFactory factory = new NMSConnectionFactory(brokerUri);
        IConnection connection = factory.CreateConnection();
        connection.Start();
        ISession session = connection.CreateSession();

      1、brokerUri是NSM的连接信息,作用类似于DbConnection的连接字符串,不过,对于ActiveMQ而言,单机和集群环境下它们是不一样的:

      如果是单机模式下,brokerUri格式如下:  

        // activemq:tcp://host:port?name1=value1&name2=value2
        string brokerUri = "activemq:tcp://192.168.209.133:61616?alwaysSessionAsync=true";

      其中?alwaysSessionAsync=true部分是QueryString格式的连接配置参数,alwaysSessionAsync表示是否采用异步会话,更多配置参考官网:http://activemq.apache.org/connection-configuration-uri

      如果是集群模式,brokerUri格式如下:  

        // activemq:failover:(tcp://host1:port1,tcp://host2:port2,tcp://host2:port2)?name1=value1&name2=value2
        string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";

      启动?randomize=false也是QueryString格式的连接配置,randomize表示是否随机取一个uri去连接,更多配置参考官网:http://activemq.apache.org/failover-transport-reference.html

      2IConnectionFactory是工厂接口,而NMSConnectionFactory是NMS中其他工厂的代理工厂,从源码中可以看到目前代理的工厂有:  

        static NMSConnectionFactory()
        {
            schemaProviderFactoryMap = new Dictionary<string, ProviderFactoryInfo>();
            schemaProviderFactoryMap["activemq"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.ConnectionFactory");
            schemaProviderFactoryMap["activemqnettx"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.NetTxConnectionFactory");
            schemaProviderFactoryMap["tcp"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.ConnectionFactory");
            schemaProviderFactoryMap["ems"] = new ProviderFactoryInfo("Apache.NMS.EMS", "Apache.NMS.EMS.ConnectionFactory");
            schemaProviderFactoryMap["mqtt"] = new ProviderFactoryInfo("Apache.NMS.MQTT", "Apache.NMS.MQTT.ConnectionFactory");
            schemaProviderFactoryMap["msmq"] = new ProviderFactoryInfo("Apache.NMS.MSMQ", "Apache.NMS.MSMQ.ConnectionFactory");
            schemaProviderFactoryMap["stomp"] = new ProviderFactoryInfo("Apache.NMS.Stomp", "Apache.NMS.Stomp.ConnectionFactory");
            schemaProviderFactoryMap["xms"] = new ProviderFactoryInfo("Apache.NMS.XMS", "Apache.NMS.XMS.ConnectionFactory");
            schemaProviderFactoryMap["zmq"] = new ProviderFactoryInfo("Apache.NMS.ZMQ", "Apache.NMS.ZMQ.ConnectionFactory");
            schemaProviderFactoryMap["amqp"] = new ProviderFactoryInfo("Apache.NMS.AMQP", "Apache.NMS.AMQP.ConnectionFactory");
        }

       如果是ActiveMQ,下面两种方式是等价的,因为NMSConnectionFactory只是其它工厂的代理:  

        IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri);
        IConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);

      3、使用工厂的CreateConnection方法可以创建实现IConnection接口的连接,其实内部实现也是通过被代理的工厂去创建连接,CreateConnection有个重载,可以传入用户名和密码,也就是认证信息  

        IConnection CreateConnection();
        IConnection CreateConnection(string userName, string password);

      注意,这里的用户名和密码可不是管理后台的用户和密码,它是以插件的形式在acticvemq.xml文件中的broker节点内配置的,如:  

        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
      ....
    <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="test" password="123456" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> </broker>

       4、IConnection的Start方法用于开启连接

      5、创建会话使用IConnection的CreateSession方法,同样它也有一个重载,表示会话模式AcknowledgementMode,可以分为有事务和非事务两种

        ISession CreateSession();
        ISession CreateSession(AcknowledgementMode acknowledgementMode);
        # 非事务模式,主要用在签收方面,表示消费时是否有收到消息并处理完消息,然后需要从队列中移除
       AutoAcknowledge:消息自动签收,这也是默认的方式,只要消费者收到消息就将消息从队列移除,不管消费者有没有正确处理消息 DupsOkAcknowledge:可重复签收模式,表示Session不用确定消息的签收,消息可能会重复发给消费者(如果是重复发送的消息,那么消息的NMSRedelivered属性将被置为true),这样做可以降低某些开销,如果消费者能容忍重复消息就可以使用这种模式 ClientAcknowledge:客户端签收模式,表示需要客户端确认签收,确认签收是调用某个消息的Acknowledge方法,需要注意的是,在这种模式下,签收一个消息会导致自动签收Session下所有的已消费消息。
       IndividualAcknowledge:单一签收模式,可ClientAcknowledge模式一样需要调用消息的Acknowledge方法签收确认,但是不同的时它只会签收当前消息,避讳导致Session中其它消费被签收
      
       # 事务模式,也就是一起提交,不然就回滚,这种模式往往发生在生产者发送多条消息时 Transactional:事务机制,在事务中,要么调用Session的Commit方法一起提交,要么调用Rollback方法回滚,如果在事务中未Commit就调用Close关闭会话,将自动触发Rollback

       6、IConnection和ISession都有三个事件:  

        # IConnection事件
        ExceptionListener:在发生异常是触发事件,比如服务器挂了导致连接断开
        ConnectionInterruptedListener:连接断开是触发事件,主要是在集群模式中,master节点挂了导致
        ConnectionResumedListener:重新建立连接时触发事件,主要是在集群模式中,需要重新连接到master节点
        
        # ISession事件
        TransactionCommittedListener:事务提交后触发事件
        TransactionRolledBackListener:事务回滚后触发事件
        TransactionStartedListener:事务开始时触发事件

      7、Session的常用方法:  

        //创建消息
        CreateBytesMessage();
        CreateMapMessage();
        CreateMessage();
        CreateObjectMessage();
        CreateStreamMessage();
        CreateTextMessage();
        
        //消费者与生产者
        CreateProducer();//创建生产者
        CreateConsumer();//创建消费者
        CreateDurableConsumer();//创建持久化订阅者,只对Topic有效
        DeleteDurableConsumer();//删除持久化订阅者
    
        //Queue和Topic
        GetQueue();//获取指定名称的Queue,如果不存在,将自动创建
        GetTopic();//获取指定名称的Topic,如果不存在,将自动创建
        CreateTemporaryQueue();//创建临时队列,队列属于当前IConnection,只有当前IConnection下才能访问,如果IConnection关闭断开,队列将自动删除
        CreateTemporaryTopic();//创建临时的Topic,Topic属于当前IConnection,只有当前IConnection下才能访问,如果IConnection关闭断开,Topic将自动删除
        DeleteDestination();//删除队列或者Topic
        
        //其他
        Close()://关闭当前会话
        Commit();//提交会话,在事务模式中使用
        Rollback();//会话回滚,在事务模式中使用
        Recover();//恢复会话,这个方法会导致消息重新发送,慎用
        CreateBrowser();//创建一个队列的浏览机制,主要用于查看某个队列中的数据,它的存在意义主要在于,一般的情况下,要查看队列中的消息,需要弹出消息才行,既然弹出来了,那消息又怎么消费?而IQueueBrowser允许我们在不弹出消息的情况下获取队列中的消息信息        
    

      

      消息

      ActiveMQ中,消息也可以通过Session来创建,也可以自己实例化,而ActiveMQ的消息主要有以下几种:  

        IBytesMessage:接受字节数据未消息
        IMapMessage:接受字典(键值对)消息
        IObjectMessage:接受Object对象消息
        IStreamMessage:接受流数据消息,同IBytesMessage差不多
        ITextMessage:接受字符文本消息,这也是用的最多的一个

      这些消息其实都实现了IMessage接口,除了它们各自的几个私有属性和方法外(比如ITextMessage有Text属性,IObjectMessage有Body属性,IBytesMessage和IStreamMessage有很多的Read和Write方法),常用的公共属性还有这么几个:  

        Properties:消息属性,生产者往Properties中保存的数据将会原样的给消费者,不过消息属性的作用主要是在Selector消息选择上,后文再说
        NMSDestination:消息所在通道,对ActiveMQ来说是Queue或者Topic
        NMSTimeToLive:消息的有效时间
        NMSMessageId:消息的唯一ID
        NMSDeliveryMode:消息传递方式(是否持久化)
        NMSPriority:优先级
        NMSRedelivered:是否是重新发送的消息,可能是事务回滚或者使用Session的Recover方法重发,亦可能是DupsOkAcknowledge模式下的重发,总之,只要消息不是第一次发给消费者,那么这个属性就是true
        
        Acknowledge():确认签收方法

      生产消息

       先看例子

        ISession session = connection.CreateSession();
        var producer = session.CreateProducer();
        producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息
        var destination = session.GetQueue("queue");
       //var destination = session.GetTopic("topic");//发送到Topic
    var message = session.CreateTextMessage("hello activemq"); producer.Send(destination, message);

      1、通过会话ISession的CreateProducer方法创建一个生产者,CreateProducer方法有个重载,可接受一个默认的通道Destination。

        IMessageProducer CreateProducer();
        IMessageProducer CreateProducer(IDestination destination);

      2、对ActiveMQ来说,Destination是Queue或者Topic,而获取Destination一般有两种方法  

        // 方法一(推荐)、通过Session.GetQueue()或者Session.GetTopic()方法来获取
        var destination = session.GetQueue("queue");
        var destination = session.GetTopic("topic");
    
        // 方法二:直接实例化
        var destination = new ActiveMQQueue("queue");
        var destination = new ActiveMQTopic("topic");

       3、发送消息需要使用IMessageProducer的Send方法,它有几个重载,无通道Destination的Send方法需要在创建Producer时指定一个Destination,而Send方法接受的几个参数主要如下:  

        void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive);
        void Send(IDestination destination, IMessage message);
        void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive);//在CreateProducer方法创建IMessageProducer时需要指定Destination参数
        void Send(IMessage message);//在CreateProducer方法创建IMessageProducer时需要指定Destination参数
        destination:通道,queue或者topic
        message:消息,用的多的就是ITextMessage
        deliveryMode:消息传递方式:持久化(Persistent)和非持久化(NonPersistent),默认是Persistent,持久化消息保证了消息的可靠性,保证消息发送之后至少被消费一次,而非持久化不保证消息不丢失,但是它的开销更小
        priority:优先级,默认是Normal
        timeToLive:消息的有效期,如果有效期内还未被消费,消息会被发送到死信队列

      其中deliveryMode、priority、timeToLive可以在Producer中设置默认值,比如上面的例子,deliveryMode设置默认值Persistent。

      4、生产者IMessageProducer有一个委托:ProducerTransformer,这个委托主要用作一个转换,或者说是在调用Send方法后,真正发给ActiveMQ之前做的一层过滤机制,通过这个委托,我们可以做往消息属性添加一些数据,或者统一消息格式等等操作,如:  

        //每个发往ActiveMQ的消息都会加上一个名称为name的属性
        producer.ProducerTransformer = new ProducerTransformerDelegate((s, p, m) =>
        {
            m.Properties["name"] = "value";
            return m;
        });

      

      消费消息

      NMS对ActiveMQ的消息消费有两种:阻塞消费(同步消费),异步消费

      阻塞消费需要我们自己写循环来实现:  

        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
        var destination = session.GetQueue("queue");
        var consumer = session.CreateConsumer(destination);
        while (true)
        {
            var message = consumer.Receive();//阻塞式消费
            Console.WriteLine("接收到消息:" + message?.ToString());
            //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
            //message?.Acknowledge();
        }

      异步消费:  

        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
        var destination = session.GetQueue("queue");
        var consumer = session.CreateConsumer(destination);
        consumer.Listener += message =>
        {
            Console.WriteLine("接收到消息:" + message?.ToString());
            //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
            //message?.Acknowledge();
        };

      1、非持久化消费者使用Session的CreateConsumer方法创建,创建需要指明Destination,表明是从哪个Queue或者Topic去消费,它包含三个参数: 

        IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal);
        IMessageConsumer CreateConsumer(IDestination destination, string selector);
        IMessageConsumer CreateConsumer(IDestination destination);
        destination:通道,Queue或者Topic
        selector:消息选择器,默认值是null(不是空字符串),它是一个字符串格式,基于SQL92表达式语法,格式上,你可以认为是SQL语句的where部分(不包含子查询),具体使用后文再介绍
        noLocal:默认false,当为true时表示创建的消费者不消费当前同一连接创建的生产者发送的消息,只能消费其他连接上的生产者发送的消息(这个配置只对Topic生效,对Queue不生效)

      注意,CreateConsumer方法创建的消费者是非持久化的,也就是说,如果消费者断线期间发送的消息将会丢失。

      创建持久化订阅者需要使用CreateDurableConsumer方法,持久化订阅者只能是针对Topic,创建持久化订阅者需要指定一个ClientId,当断线后,再次连接就是通过这个ClientId来比较的,另外还可以指定一个name表示持久化订阅者的名称  

        IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal);

       看看个列子

        string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
        IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri); 
        IConnection connection = factory.CreateConnection();
        connection.ClientId = "myClientId";
        connection.Start();
        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
        var destination = session.GetTopic("topic");
        var consumer = session.CreateDurableConsumer(destination, "Subscription Name", null, false);
        consumer.Listener += message =>
        {
            Console.WriteLine("接收到消息:" + message?.ToString());
            //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
            //message?.Acknowledge();
        };

       上面的代码,哪怕连接断开期间有接收到新的消息,当再次连接是,也能消费得到,因为CreateDurableConsumer方法在ActiveMQ中注册了一个指定ClientId的持久化订阅者,ActiveMQ会给这个指定ClientId的持久化订阅者保存它断线期间接收到的消息,当下次这个ClientId的订阅者重新连接时,ActiveMQ会将断线期间接收到的消息发给订阅者。

      2、消息消费主要有下面四个方法:  

        # 同步消费
       Receive():阻塞当前线程,直到有接收到消息时才继续往下执行 Receive(TimeSpan timeout):阻塞当前线程,直到有接收到消息时才继续往下执行,或者如果指定时间内未收到消息,方法将会返回null ReceiveNoWait():不会阻塞当前线程,如果有消息则返回消息,没有消息则返回null
      
       # 异步消费
       Listener事件

      3、同样的,IMessageConsumer也有一个委托:ConsumerTransformer,它在消费者接受到消息后,在传给消费程序前执行,可以看做一层过滤机制。

      消息选择器(Selector)

      前面好几个地方提到消息选择器,现在说说它。

      消息选择器采用ActiveMQ提供了SQL92表达式语法,格式上我们可以认为就是SQL语句中不带子查询条件的where部分语句,或者说通俗点,可以认为消息选择器就是通过对消息属性的刷选过滤来实现对消息的刷选过滤

      在语法上,消息选择器常用的数据类型有:  

       数值类型:只能由数字开头,可以包含小数点表示浮点数,不要用引号包着,比如:1、2.34、5.6789,常用运算符有:>、<、=、>=、<=、BETWEEN、+、-、*、/、%
       字符类型:用单引号包着的字符串,比如:'a'、'hello',常用的运算符有:=、<>、IN,注意不等于是<>不是!=
       NULL:表示空,常用的运算:IS NULL 和 IS NOT NULL
       布尔类型:TRUE、FALSE,常用的逻辑运算:AND、OR、NOT

      比如:  

        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
        var destination = session.GetTopic("topic");
        var consumer = session.CreateConsumer(destination, "name='Apple' or price>5000");
        consumer.Listener += message =>
        {
            if (message is ITextMessage textMessage)
            {
                Console.WriteLine($"从{destination.TopicName}接收到过滤后的消息:{textMessage.Text}");
            }
            //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
            //message?.Acknowledge();
        };

      发送消息:

        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
        var producer = session.CreateProducer();
        producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息
        IDestination destination = session.GetTopic("topic");
    
        //message1可以被消费到
        var message1 = session.CreateTextMessage("message1");
        message1.Properties["name"] = "Apple";
        producer.Send(destination, message1);
    
        //message2可以被消费到
        var message2 = session.CreateTextMessage("message2");
        message2.Properties["price"] = 6000d;
        producer.Send(destination, message2);
    
        //message3不能被消费
        var message3 = session.CreateTextMessage("message3");
        producer.Send(destination, message3);

      

      日志输出

      在开发过程中,有时由于一个小错误,导致某些预想不到的结果,这时我们可能需要打印一下日志查看一下问题(你总不希望去看源码流程吧),比如我碰到的,因为发送的消息属性不合理(貌似属性不支持decimal,可以理解,毕竟它不是基础属性),导致ActiveMQ集群中master出问题,然后在Producer.Send()时导致线程阻塞卡住,但是它却没有抛出异常!这种问题是很难找,还好ActiveMQ提供了日志输出的功能,允许我们实现 Apache.NMS.ITrace 来实现日志输出,比如我们简单的实现:  

      
        public class MyActiveTrace : Apache.NMS.ITrace
        {
    
            public bool IsDebugEnabled => true;
            public bool IsInfoEnabled => true;
            public bool IsWarnEnabled => true;
            public bool IsErrorEnabled => true;
            public bool IsFatalEnabled => true;
    
            public void Debug(string message)
            {
                if (IsDebugEnabled)
                {
                    Console.WriteLine($"Debug:{message}");
                }
            }
    
            public void Error(string message)
            {
                if (IsErrorEnabled)
                {
                    Console.WriteLine($"Error:{message}");
                }
            }
    
            public void Fatal(string message)
            {
                if (IsFatalEnabled)
                {
                    Console.WriteLine($"Fatal:{message}");
                }
            }
    
            public void Info(string message)
            {
                if (IsInfoEnabled)
                {
                    Console.WriteLine($"Info:{message}");
                }
            }
    
            public void Warn(string message)
            {
                if (IsWarnEnabled)
                {
                    Console.WriteLine($"Warn:{message}");
                }
            }
        }
    MyActiveTrace

      在使用是,只需要给 Tracer.Trace 的静态属性赋值就可以了:  

        Tracer.Trace = new MyActiveTrace();

      ActiveMQ发送和接收消息的一个完成的Demo例子

      例子:  

        static void Main(string[] args)
        {
            Tracer.Trace = new MyActiveTrace();
    
            //string brokerUri = "activemq:tcp://192.168.209.133:61616?alwaysSessionAsync=true";
            string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
            IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri);
            IConnection connection = factory.CreateConnection();
            connection.Start();
    
            //从队列消费
            {
                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
                var destination = session.GetQueue("queue");
                var consumer = session.CreateConsumer(destination);
                consumer.Listener += message =>
                {
                    if (message is ITextMessage textMessage)
                    {
                        Console.WriteLine($"从{destination.QueueName}接收到消息:{textMessage.Text}");
                    }
                    //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
                    //message?.Acknowledge();
                };
            }
            //从Topic消费
            {
                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
                var destination = session.GetTopic("topic");
                var consumer = session.CreateConsumer(destination);
                consumer.Listener += message =>
                {
                    if (message is ITextMessage textMessage)
                    {
                        Console.WriteLine($"从{destination.TopicName}接收到消息:{textMessage.Text}");
                    }
                    //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
                    //message?.Acknowledge();
                };
            }
            //使用Selector从Topic过滤
            {
                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自动签收
                var destination = session.GetTopic("topic");
                var consumer = session.CreateConsumer(destination, "name='Apple' or price>5000 ");
                consumer.Listener += message =>
                {
                    if (message is ITextMessage textMessage)
                    {
                        Console.WriteLine($"从{destination.TopicName}接收到过滤后的消息:{textMessage.Text}");
                    }
                    //如果是ClientAcknowledge或者IndividualAcknowledge,需要调用Acknowledge方法进行签收确认
                    //message?.Acknowledge();
                };
            }
            //发送消息
            {
                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                var producer = session.CreateProducer();
                producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息
    
                while (true)
                {
                    Console.Write("请输入消息(输入空表示退出):");
                    var line = Console.ReadLine();
                    if (string.IsNullOrEmpty(line)) break;
                    var message = session.CreateTextMessage(line);
    
                    IDestination destination;
                    var value = new Random().Next();
                    if (value % 2 == 0)
                    {
                        destination = session.GetQueue("queue");
                        Console.WriteLine($"往队列queue中发送消息:{line}");
                    }
                    else
                    {
                        destination = session.GetTopic("topic");
                        message.Properties["name"] = new string[] { "小米", "华为", "Apple" }.ElementAt(new Random().Next(0, 3));
                        message.Properties["price"] = new double[] { 3000d, 4000d, 5000d, 6000d }.ElementAt(new Random().Next(0, 3));
                        Console.WriteLine($"往topic中发送消息:{line}");
                        Console.WriteLine($"消息的属性:name={message.Properties["name"]}     price={message.Properties["price"]}");
                    }
    
                    producer.Send(destination, message);
                }
            }

        
    //关闭连接前记得注意消费者是不是异步消费      connection.Close();
    Console.ReadKey(); }
    一个专注于.NetCore的技术小白
  • 相关阅读:
    Linux命令ll输出后各个字段的含义
    常用的Linux指令
    纪念逝去的2016
    Grails默认首页的修改
    js中构造字符串若放入Grails中gsp的<g:link>标签出错
    Grails的redirect无法跳转时的一个可能原因
    Grails连接外部数据库注意事项Could not determine Hibernate dialect for database name [Oracle]!
    ICPC2020济南A Matrix Equation
    最后的挣扎
    [省选联考 2020 A/B 卷] 信号传递
  • 原文地址:https://www.cnblogs.com/shanfeng1000/p/14331862.html
Copyright © 2011-2022 走看看