zoukankan      html  css  js  c++  java
  • ActiveMQ单机部署及简单应用

    系统版本:Centos 7

    前言 

      MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 

    特点: 
    1、支持多种语言编写客户端 
    2、对spring的支持,很容易和spring整合 
    3、支持多种传输协议:TCP,SSL,NIO,UDP等 
    4、支持AJAX 
    消息形式: 
    1、点对点(queue) 
    2、一对多(topic) 

    下载前准备

    配置java环境变量

    1、下载安装包

    两种方案

      1)ActiveMQ官网下载,然后上传至服务器

      2)通过命令下载 wget http://archive.apache.org/dist/activemq/5.15.10/apache-activemq-5.15.10-bin.tar.gz

    2、解压,修改文件夹名称,进入bin目录

    # tar -zxvf /usr/local/apache-activemq-5.15.10-bin.tar.gz -C /usr/local
    # mv /usr/local/apache-activemq-5.15.10 /usr/local/apache-activemq
    # cd /usr/local/apache-activemq/bin

    3、启动服务

    # activemq start

    若文件没有权限则需提升权限

    # chmod 755 activemq

    4、查看防火墙是否关闭,需关闭防火墙

    # firewall-cmd --state

    firewall-cmd --state #查看防火墙状态
    systemctl stop firewalld.service #停止firewall
    systemctl disable firewalld.service #禁止firewall开机启动

    5、访问控制台页面

    http://192.168.1.101:8161/admin

    6、登录控制台,默认用户名密码均为admin

     7、配置文件说明

     进入到/usr/local/activemq/conf目录,有以下几个重要文件

    (1) activemq.xml,在此文件中你可以配置activemq的很多东西,比如将消息持久化到数据库等。

    (2) credentials.properties,一些密码,多用于生产和消费的密码认证。

    (3) jetty.xml,activemq内置了jetty应用服务器。

    (4) jetty-realm.properties,activemq控制台登陆密码。

    8、C#控制台生产者和消费者

    VS中新建两个控制台,都引用NuGet包Apache.NMS.ActiveMQ

    先启动生产者再启动消费者

    Producer

    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System;
    
    namespace MQProducer
    {
        class Program
        {
            static void Main(string[] args)
            {
                string str = "192.168.1.101:61616";
                while (!Console.KeyAvailable)
                {
                    //初始化工厂
                    IConnectionFactory factory = new ConnectionFactory("activemq:tcp://" + str);
                    using (IConnection connection = factory.CreateConnection())
                    {
                        using (ISession session = connection.CreateSession())
                        {
                            IMessageProducer prod = session.CreateProducer(
                                new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
                            //Send Messages  
                            int i = 0;
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            System.Threading.Thread.Sleep(500);
                            i++;
                        }
                    }
                }
            }
        }
    }
    View Code

    Consumer

    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System;
    
    namespace MQConsumer
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    string str = "192.168.1.101:61616";
    
                    //初始化工厂
                    IConnectionFactory factory = new ConnectionFactory("activemq:tcp://" + str);
    
                    //Create the connection  
                    using (IConnection connection = factory.CreateConnection())
                    {
                        connection.ClientId = "testing listener";
                        connection.Start();
    
                        //Create the Session  
                        using (ISession session = connection.CreateSession())
                        {
                            //Create the Consumer  
                            IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);
    
                            consumer.Listener += new MessageListener(consumer_Listener);
    
                            Console.ReadLine();
                        }
                        connection.Stop();
                        connection.Close();
                    }
                }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
    
            static void consumer_Listener(IMessage message)
            {
                try
                {
                    ITextMessage msg = (ITextMessage)message;
                    Console.WriteLine("Receive: " + msg.Text);
                }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
    View Code

  • 相关阅读:
    ACE反应器(Reactor)模式(3)
    ACE反应器(Reactor)模式(2)
    ACE反应器(Reactor)模式(1)
    ACE主动对象模式(2)
    ACE主动对象模式(1)
    ACE中UDP通信
    ACE中TCP通信
    从golang-gin-realworld-example-app项目学写httpapi (六)
    从golang-gin-realworld-example-app项目学写httpapi (五)
    从golang-gin-realworld-example-app项目学写httpapi (四)
  • 原文地址:https://www.cnblogs.com/GoCircle/p/11685130.html
Copyright © 2011-2022 走看看