zoukankan      html  css  js  c++  java
  • Windows Server环境下消息队列之ActiveMQ实战

    环境准备

    1.安装jdk1.7+

    2.下载新版ActiveMQ

       http://activemq.apache.org/

    3.启动activemq服务

    4.启动成功后的界面是

     5.启动成功后

    浏览器访问http://localhost:8161/admin,默认用户名和密码admin/admin,管理员界面如下:

     

    6.如果你想修改用户名和密码的话

    在conf/jetty-realm.properties中修改即可,重启服务才能生效。

    7.如果需要修改端口可以在jetty文件中修改,重启服务才能生效。

    发送消息的基本步骤

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

    (5)、使用消息生产者MessageSender发送消息 

    消息接收者从JMS接受消息的步骤

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

    (5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

    C#实战代码

    1.ActiveMQ官方网站下载最新版的C#驱动

       Apache.NMS:http://activemq.apache.org/nms/nms-api-downloads.html

       Apache.NMS.ActiveMQ:http://activemq.apache.org/nms/activemq-downloads.html

    2.引用以下两个dll

    Apache.NMS.ActiveMQ.dll
    Apache.NMS.dll

       config中添加配置

     <add key="ActiveMQUrl" value="tcp://127.0.0.1:61616" />

    在ActiveMQ的conf目录的activemq.xml中的节点<broker></broker>里面添加账号密码,重启服务才能生效

    <!-- 添加访问ActiveMQ的账号密码 -->  
    <plugins>  
           <simpleAuthenticationPlugin>  
                 <users>  
                    <authenticationUser username="admin" password="manager" groups="users,admins"/>  
                  </users>  
            </simpleAuthenticationPlugin>  
    </plugins>  

    生产者代码

    public void ActiveMQProducer()
            {
                string ActiveMQUrl = System.Configuration.ConfigurationManager.AppSettings["ActiveMQUrl"];
                IConnectionFactory factory = new ConnectionFactory(ActiveMQUrl);
                //通过工厂建立连接
                using (IConnection connection = factory.CreateConnection("admin", "manager"))
                {
                    //通过连接创建Session会话
                    using (ISession session = connection.CreateSession())
                    {
                        //通过会话创建生产者,方法里面new出来的是MQ中的Queue
                        ActiveMQQueue quene = new ActiveMQQueue("audioQueue");//点到点模式  队列模式
                        //ActiveMQTopic quene = new ActiveMQTopic("testTopic");//广播形式  发布订阅模式  发送的消息如果没有消费者接收就会没了
    
                        IMessageProducer prod = session.CreateProducer(quene);
                        //创建一个发送的消息对象
                        ITextMessage message = prod.CreateTextMessage();
                        //给这个对象赋实际的消息
    
                        //任务实体模型
                        ActiveMQInfo activeMQInfo = new ActiveMQInfo();
                        activeMQInfo.TaskID = Guid.NewGuid().ToString();
                        activeMQInfo.TaskName = "订单处理";
                        activeMQInfo.FileName = Guid.NewGuid().ToString();
                        activeMQInfo.TaskDateTime = DateTime.Now;
    
                        message.Text = JsonConvert.SerializeObject(activeMQInfo);
                        //设置消息对象的属性,这个很重要哦,是Queue的过滤条件,也是P2P消息的唯一指定属性
                        message.Properties.SetString("filter", "getduration");
                        //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否长链,MsgPriority消息优先级别,发送最小单位,当然还有其他重载
                        prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
                    }
                }
            }

    消费者代码

            public void ActiveMQConsumer()
            {
                try
                {
                    string ActiveMQUrl = System.Configuration.ConfigurationManager.AppSettings["ActiveMQUrl"];
                    //创建连接工厂
                    IConnectionFactory factory = new ConnectionFactory(ActiveMQUrl);
                    //通过工厂构建连接
                    IConnection connection = factory.CreateConnection("admin", "manager");
                    //这个是连接的客户端名称标识
                    connection.ClientId = "GuardQueueListener";
                    //启动连接,监听的话要主动启动连接
                    connection.Start();
                    //logger.Error("消息队列启动");
                    //通过连接创建一个会话
                    ISession session = connection.CreateSession();
                    //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
                    ActiveMQQueue quene = new ActiveMQQueue("audioQueue");//队列模式
                    //ActiveMQTopic quene = new ActiveMQTopic("testTopic");//发布订阅模式  发送的消息如果没有消费者接收就会没了
                    IMessageConsumer consumerAudio = session.CreateConsumer(quene,"filter='getduration'");
                    //注册监听事件
                    consumerAudio.Listener += new MessageListener(consumerMediaAudio_Listener);
                }
                catch (Exception ex)
                {
                    //记录异常信息
                }
            }
    
            private void consumerMediaAudio_Listener(IMessage message)
            {
                ITextMessage iTextMessage = (ITextMessage)message;
                string msg = iTextMessage.Text;
                //逻辑处理代码
                //ActiveMQInfo activeMQInfo = JsonConvert.DeserializeObject<ActiveMQInfo>(msg);
    
            }
    ActiveMQInfo实体类
    public class ActiveMQInfo
        {
            public string TaskID { get; set; }
            public string TaskName { get; set; }
            public string FileName { get; set; }
            public DateTime TaskDateTime { get; set; }
        }

    两种模式

     

    ActiveMQ持久化消息

    1.持久化为文件

        涉及到的配置activemq.xml

    <persistenceAdapter>
           <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

        C#代码

    prod.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);

    2.持久化为MySql

         你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,总共用到3个jar包:

           mysql-connector-java-5.1.6.jar:https://dev.mysql.com/downloads/connector/j/

           commons-dbcp-1.4.jar:http://commons.apache.org/proper/commons-dbcp/download_dbcp.cgi

           commons-pool-1.6.jar:http://commons.apache.org/proper/commons-pool/download_pool.cgi

         接下来修改配置文件activemq.xml里面的broker节点里面的persistenceAdapter节点,为了防止每次重启服务都去创建新表,只在第一次配置的时

          createTablesOnStartup="true",数据库创建表后我们将createTablesOnStartup="false"。

    <persistenceAdapter>
          <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> 
    </persistenceAdapter>

         在配置文件中的broker节点外增加

    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
            <property name="username" value="root"/>
            <property name="password" value="root"/>
            <property name="maxActive" value="200"/>
            <property name="poolPreparedStatements" value="true"/>
    </bean>

    从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。然后重新启动消息队列,你会发现多了3张表:

    3.其他持久化以后再写吧

     

  • 相关阅读:
    对图像组成不了解?这样学习Matplotlib必走弯路!
    Python数据可视化Matplotlib——Figure画布背景设置
    Matplotlib风羽自定义
    matplotlib删除地图投影上的等值线及风场
    利用Eric+Qt Designer编写倒计时时钟
    Python数据可视化利器Matplotlib,绘图入门篇,Pyplot介绍
    matplotlib极坐标方法详解
    Windows下Python读取GRIB数据
    简明Python教程自学笔记——命令行通讯录
    基于Python的Grib数据可视化
  • 原文地址:https://www.cnblogs.com/cnki/p/7091330.html
Copyright © 2011-2022 走看看