zoukankan      html  css  js  c++  java
  • (一)RabbitMQ:RabbitMQ初体验

     RabbitMQ是目前非常热门的一款消息中间件,凭借其高可靠,易扩展,高可用及丰富的功能特性受到越来越多企业的青睐。
     RabbitMQ时采用Erlang语言实现AMQP的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

     RabbitMQ具体特点如下:

    • 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化,传输确认及发布确认等。
    • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
    • 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
    • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
    • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
    • 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java,Python,Ruby,PHP,C#,JavaScript等。
    • 管理界面:RabbitMQ提供了一易用的用户界面,使得用户可以监控和管理消息,集群中的节点等。
    • 插件机制:RabbitMQ提供了许多插件,以实现从多方面扩展,当然也可以编写自己的插件。

    1.消息中间件

    • 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如:只包含文本字符串,JSON等,也可以很复杂,如内嵌对象。
    • 消息队列中间件(Message Queue Middleware,简称为MQ)是指利用高效可靠消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
    • 消息队列中间件,也可以称为消息队列或者消息中间件。一般有两种模式:点对点(P2P)模式,发布/订阅(Pub/Sub)模式。点对点模式:基于队列,生产者发送消息到队列,消费者从队列中接受消息,队列的存在使得消息的异步传输称为可能。发布/订阅模式:定义了如何向一个内容节点(主题topic,可认为是消息传递中介)发布和订阅消息,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
    • 目前开源的消息中间件,比较主流的有:RabbitMQ,Kafka,ActiveMQ,RocketMQ等。面向消息的中间件(简称为MOM)提供了松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序中间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了有保证的消息发送。应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。

    2.消息中间件的作用

     在不同的应用场景下可以展现不同的作用。

    • 解耦:在项目启动之初来预测会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的,基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的皆苦约束即可。
    • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出改消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
    • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
    • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
    • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
    • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
    • 缓冲:在任何重要的系统中,都会存在需要不同处理时间元素。消息中间件通过一个缓冲层来帮助任务高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
    • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

    3.RabbitMQ的安装及使用

    3.1在安装RabbitMQ之前需要安装Erlang,建议使用较新版本Erlang,这样可以获得较多更新和改进。官网下载地址:https://www.erlang.org/downloads

     [root@instance-5x tar.gz]# tar -zxvf otp_src_20.0.tar.gz -C /usr/local/src
     [root@instance-5x tar.gz]# cd /usr/local/src/otp_src_20.0/
     [root@instance-5x otp_src_20.0]# ./configure --prefix=/usr/local/erlang
     [root@instance-5x otp_src_20.0]# make
     [root@instance-5x otp_src_20.0]# make install
    

    若:出现报错信息:No curses library functions found。需要安装ncurses。

     [root@instance-5x otp_src_20.0]# yum  install  ncurses-devel
    

    配置环境变量

     [root@instance-5x otp_src_20.0]# vim /etc/profile
     export ERLANG_HOME=/usr/local/erlang
     PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin
     [root@instance-5x otp_src_20.0]# . /etc/profile
    

    输入erl命令来验证Erlang是否安装成功

    [root@instance-5x otp_src_20.0]# erl
    Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
    
    Eshell V9.0  (abort with ^G)
    1>
    

    3.2RabbitMQ的安装

     官网下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/ 安装包解压到相应的目下即可。

    [root@instance-5x tar.gz]# tar -xvf rabbitmq-server-generic-unix-3.6.10.tar /usr/local/
    

    配置环境变量

    export RABBITMQ_HOME=/usr/local/rabbitmq_server-3.6.10
    PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin
    [root@instance-5x otp_src_20.0]# . /etc/profile
    

    运行RabbitMQ服务

    [root@instance-5x otp_src_20.0]# rabbitmq-server  -detached
    

    rabbitmq-server 命令后面添加一个“-detached” 参数是为了能够让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。

    rabbitmqctl status 命令查看RabbitMQ是否正常启动:

    [root@instance-5x tar.gz]# rabbitmqctl  status
    Status of node 'rabbit@instance-och69p5x'
    [{pid,25344},
    {running_applications,
        [{rabbit,"RabbitMQ","3.6.10"},
         {mnesia,"MNESIA  CXC 138 12","4.15"},
         {rabbit_common,
             "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
             "3.6.10"},
         {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
         {ssl,"Erlang/OTP SSL application","8.2"},
         {public_key,"Public key infrastructure","1.4.1"},
         {asn1,"The Erlang ASN1 compiler version 5.0","5.0"},
         {xmerl,"XML parser","1.3.15"},
         {os_mon,"CPO  CXC 138 46","2.4.2"},
         {syntax_tools,"Syntax tools","2.1.2"},
         {crypto,"CRYPTO","4.0"},
         {compiler,"ERTS  CXC 138 10","7.1"},
         {sasl,"SASL  CXC 138 11","3.0.4"},
         {stdlib,"ERTS  CXC 138 10","3.4"},
         {kernel,"ERTS  CXC 138 10","5.3"}]},
    {os,{unix,linux}},
    {erlang_version,
        "Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:64] [hipe] [kernel-poll:true]
    "},
    {memory,
        [{total,51716936},
         {connection_readers,0},
         {connection_writers,0},
         {connection_channels,0},
         {connection_other,0},
         {queue_procs,30584},
         {queue_slave_procs,0},
         {plugins,0},
         {other_proc,18867000},
         {mnesia,66104},
         {metrics,184432},
         {mgmt_db,0},
         {msg_index,48984},
         {other_ets,1810296},
         {binary,448456},
         {code,21385216},
         {atom,891849},
         {other_system,8165535}]},
    {alarms,[]},
    {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
    {vm_memory_high_watermark,0.4},
    {vm_memory_limit,415639142},
    {disk_free_limit,50000000},
    {disk_free,32308707328},
    {file_descriptors,
        [{total_limit,65435},
         {total_used,3},
         {sockets_limit,58889},
         {sockets_used,0}]},
    {processes,[{limit,1048576},{used,156}]},
    {run_queue,0},
    {uptime,24578},
    {kernel,{net_ticktime,60}}]
    

    4.生产和消费消息

    Java代码展示
    maven依赖:

    <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>4.2.1</version>
    </dependency>
    

    默认情况下,访问RabbitMQ服务的用户名和密码都是“guest”,这个账户有限制,默认只能通过本地网络(如 localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。
    添加新用户,用户名为“root”,密码为“root123”

    [root@instance-5x tar.gz]# rabbitmqctl add_user root root123
    [root@instance-5x tar.gz]# rabbitmqctl setpermissions -p / root ".*" ".*" ".*"
    [root@instance-5x tar.gz]# rabbitmqctl set_user_tags root administrator
    

    提供者客户端代码:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMqProducer {
    
        private static String EXCHANGE_NAME = "exchage_demo";
    
        private static String QUEUE_NAME = "queue_demo";
    
        private static String ROUTING_KEY = "routingkey_demo";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //创建一个type="direct",持久化,非自动删除得交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            //创建一个持久化,非排他的,非自动删除队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //将交换机与队列通过路由键(其实是绑定键,只不过direct类型下绑定键(bindingkey)和路由键(routingkey)一致才可以到达)绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            //发送一条持久化的消息
            String message = "hello world d!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            //关闭资源
            channel.close();
            connection.close();
    
            System.out.println("finished...");
    
        }
    }
    

    消费者客户端代码:

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMqConsumer {
    
        private static String QUEUE_NAME = "queue_demo";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            //创建连接
            Connection connection = ConnectionUtil.getConnection();
    
            //创建信道
            final Channel channel = connection.createChannel();
    
            //设置客户端最多接受未被ack的消息个数
            channel.basicQos(64);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
    
                    System.out.println("recv message:" + new String(body));
    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            channel.basicConsume(QUEUE_NAME, consumer);
            //等待回调资源执行完毕后关掉资源
            TimeUnit.SECONDS.sleep(5);
    
            //关闭资源
            channel.close();
            connection.close();
    
            System.out.println("finished...");
        }
    }
    

    连接工具类代码:

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        private static ConnectionFactory factory;
    
        static {
            factory = new ConnectionFactory();
            factory.setHost("a.x.y.z");
            factory.setPort(5672);
            factory.setUsername("username");
            factory.setPassword("password");
    
        }
    
        public static Connection getConnection() throws IOException, TimeoutException {
            return factory.newConnection();
        }
    }
    
  • 相关阅读:
    Spring Boot中只能有一个WebMvcConfigurationSupport配置类
    【原创】(六)Linux进程调度-实时调度器
    【原创】(二)Linux进程调度器-CPU负载
    【原创】(十二)Linux内存管理之vmap与vmalloc
    【原创】(十一)Linux内存管理slub分配器
    【原创】(七)Linux内存管理
    【原创】(五)Linux内存管理zone_sizes_init
    【原创】(三)Linux paging_init解析
    【原创】(二)Linux物理内存初始化
    【原创】(一)ARMv8 MMU及Linux页表映射
  • 原文地址:https://www.cnblogs.com/everyingo/p/12881577.html
Copyright © 2011-2022 走看看