zoukankan      html  css  js  c++  java
  • RabbitMQ消息生产与消费

    对象 作用
    ConnectionFactory 获取连接工厂
    Connection 一个连接
    Channel 数据通信信道,可发送和接收消息
    Queue 具体的消息存储队列
    Producer & Consumer 生产者和消费者

    Maven 依赖

    1
    2
    3
    4
    5
    
    <dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>3.6.5</version>
    </dependency>
    

    Provider

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Procuder {
    	public static void main(String[] args) throws Exception {
    		//1 创建一个ConnectionFactory, 并进行配置
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.11.76");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		
    		//2 通过连接工厂创建连接
    		Connection connection = connectionFactory.newConnection();
    		
    		//3 通过connection创建一个Channel
    		Channel channel = connection.createChannel();
    		
    		//4 通过Channel发送数据
    		for(int i=0; i < 5; i++){
    			String msg = "Hello RabbitMQ!";
    			//1 exchange   2 routingKey
    			channel.basicPublish("", "test001", null, msg.getBytes());
    		}
    
    		//5 记得要关闭相关的连接
    		channel.close();
    		connection.close();
    	}
    }
    

    channel.basicPublish()

    • queue:队列的名称,字符串即可
    • durable:是否持久化
    • exclusive:是否独占,就是说这个队列只有我这一个 channel 可以监听,可以保证顺序消费
    • autoDelete:队列与 Exchange 脱离,就会自动删除
    • arguments:扩展参数,null 就好

    Customer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer {
    
    	public static void main(String[] args) throws Exception {
    		
    		//1 创建一个ConnectionFactory, 并进行配置
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.11.76");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		
    		//2 通过连接工厂创建连接
    		Connection connection = connectionFactory.newConnection();
    		
    		//3 通过connection创建一个Channel
    		Channel channel = connection.createChannel();
    		
    		//4 声明(创建)一个队列
    		String queueName = "test001";
    		channel.queueDeclare(queueName, true, false, false, null);
    		
    		//5 创建消费者
    		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    		
    		//6 设置Channel
    		channel.basicConsume(queueName, true, queueingConsumer);
    		
    		while(true){
    			//7 获取消息
    			Delivery delivery = queueingConsumer.nextDelivery();
    			String msg = new String(delivery.getBody());
    			System.err.println("消费端: " + msg);
    			//Envelope envelope = delivery.getEnvelope();
    		}
    		
    	}
    }
    

    channel.basicConsume()

    • queue,队列名称
    • autoAck,是否自动签收,返回 ack,保证高可用的时候
    • callback:具体的操作对象
  • 相关阅读:
    Linux下如何确认磁盘是否为SSD
    Nginx
    求两个Linux文本文件的交集、差集、并集
    DB磁盘满导致Zabbix Server Crash一例
    配置SSH Forward提升安全性
    更新ffmpeg
    linux系统日志__ratelimit: N callbacks suppressed
    servlet本质
    session与cookie的区别与联系
    Leetcode 19——Remove Nth Node From End of List
  • 原文地址:https://www.cnblogs.com/shuiyj/p/13185193.html
Copyright © 2011-2022 走看看