zoukankan      html  css  js  c++  java
  • RabbitMQ 使用

    安装步骤略过。

    启动

    启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:

    ./sbin/rabbitmq-server

    如果配置完环境变量,就可以省去路径

    后台启动

    ./sbin/rabbitmq-server -detached

    查询服务器状态:sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。
    查询 RabbitMQ 服务器的状态信息可以用参数 status :

    ./sbin/rabbitmqctl status

    关闭 RabbitMQ 节点

    我们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
    如果要关闭整个 RabbitMQ 节点可以用参数 stop :
    ./sbin/rabbitmqctl stop

    它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n :

    ./sbin/rabbitmqctl -n rabbit@server.example.com stop 

    -n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。

    关闭 RabbitMQ 应用程序

    如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop_app:

    ./sbin/rabbitmqctl stop_app

    启动 RabbitMQ 应用程序

    ./sbin/rabbitmqctl start_app

    重置 RabbitMQ 节点

    ./sbin/rabbitmqctl reset

    查看已声明的队列

    ./sbin/rabbitmqctl list_queues

    查看交换器

    ./sbin/rabbitmqctl list_exchanges

    查看绑定

    ./sbin/rabbitmqctl list_bindings

    Java 客户端访问

    RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

    1. maven工程的pom文件中添加依赖
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.1.0</version>
    </dependency>
    1. 消息生产者
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            //设置 RabbitMQ 地址
            factory.setHost("localhost");
            //建立到代理服务器到连接
            Connection conn = factory.newConnection();
            //获得信道
            Channel channel = conn.createChannel();
            //声明交换器
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
    
            String routingKey = "hola";
            //发布消息
            byte[] messageBodyBytes = "quit".getBytes();
            channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    
            channel.close();
            conn.close();
        }
    }
    1. 消息消费者
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            //建立到代理服务器到连接
            Connection conn = factory.newConnection();
            //获得信道
            final Channel channel = conn.createChannel();
            //声明交换器
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
            //声明队列
            String queueName = channel.queueDeclare().getQueue();
            String routingKey = "hola";
            //绑定队列,通过键 hola 将队列和交换器绑定起来
            channel.queueBind(queueName, exchangeName, routingKey);
    
            while(true) {
                //消费消息
                boolean autoAck = false;
                String consumerTag = "";
                channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        System.out.println("消费的路由键:" + routingKey);
                        System.out.println("消费的内容类型:" + contentType);
                        long deliveryTag = envelope.getDeliveryTag();
                        //确认消息
                        channel.basicAck(deliveryTag, false);
                        System.out.println("消费的消息体内容:");
                        String bodyStr = new String(body, "UTF-8");
                        System.out.println(bodyStr);
    
                    }
                });
            }
        }
    }
      1. 运行 Consumer
        先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
      2. 运行 Producer
        接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:

  • 相关阅读:
    网络基础之网络协议篇
    JVM-07-执行引擎
    JVM-06-对象实例化、内存布局、访问定位以及直接内存
    JVM-05-方法区
    JVM-04-堆
    JVM-03-本地方法接口和本地方法栈
    JVM-02-程序计数器 虚拟机栈
    JVM-01-类加载子系统
    JVM-00-引言
    swagger
  • 原文地址:https://www.cnblogs.com/chenglc/p/10174316.html
Copyright © 2011-2022 走看看