zoukankan      html  css  js  c++  java
  • Java使用RabbitMQ之公平分发

    发送消息:

     1 package org.study.workfair;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import org.junit.Test;
     6 import org.study.utils.ConnectionUtils;
     7 
     8 import java.io.IOException;
     9 import java.util.concurrent.TimeoutException;
    10 
    11 public class Sender {
    12     public static final String QUEUE_NAME = "test_simple_queue";
    13 
    14     @Test
    15     public void send() throws IOException, TimeoutException, InterruptedException {
    16         // 获取连接
    17         Connection conn = ConnectionUtils.getConnection();
    18         // 获取通道
    19         Channel channel = conn.createChannel();
    20         //创建队列
    21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    22         //每个消费者发送确认消息前,只发送一条消息
    23         channel.basicQos(1);
    24         String msg = "hello rabbitmq!";
    25 
    26         for (int i = 0; i < 50; i++) {
    27             String tempStr = i + " " + msg;
    28             //发送消息
    29             channel.basicPublish("", QUEUE_NAME, null, tempStr.getBytes());
    30             System.out.println("[send] msg " + i + ": " + msg);
    31             Thread.sleep(100);
    32         }
    33 
    34         channel.close();
    35         conn.close();
    36     }
    37 }

    接受消息:

     1 package org.study.workfair;
     2 
     3 import com.rabbitmq.client.*;
     4 import org.junit.Test;
     5 import org.study.utils.ConnectionUtils;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class Recv {
    11     public static final String QUEUE_NAME = "test_simple_queue";
    12 
    13     @Test
    14     public void recv() throws IOException, TimeoutException, InterruptedException {
    15         Connection conn = ConnectionUtils.getConnection();
    16         Channel channel = conn.createChannel();
    17         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    18         channel.basicQos(1);
    19 
    20         //定义消费者
    21         DefaultConsumer consumer = new DefaultConsumer(channel) {
    22             //重写获取到达消息
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25 //                super.handleDelivery(consumerTag, envelope, properties, body);
    26                 String msg = new String(body, "utf-8");
    27                 System.out.println("[1] recv: " + msg);
    28 
    29                 try {
    30                     Thread.sleep(1000);
    31                 } catch (InterruptedException e) {
    32                     e.printStackTrace();
    33                 }finally {
    34                     System.out.println("[1] done!");
    35                     channel.basicAck(envelope.getDeliveryTag(),false);
    36                 }
    37             }
    38         };
    39 
    40         while (true) {
    41             //监听队列
    42             channel.basicConsume(QUEUE_NAME, false, consumer);
    43             Thread.sleep(100);
    44         }
    45 
    46 
    47     }
    48 }
  • 相关阅读:
    LeetCode--Divide Two Integers
    mysql多实例安装与ssl认证
    ajax请求
    mysql5.6升级及mysql无密码登录
    mysql5.7密码设置
    BusyBox 添加 自定义命令小程序 (applet)
    分享9个常用的国外英文论文文献数据库
    arm linux 移植 gdb/gdbserver
    使用 mtd-utils 烧写Arm Linux 系统各个部分
    YUV图解 (YUV444, YUV422, YUV420, YV12, NV12, NV21)
  • 原文地址:https://www.cnblogs.com/gongxr/p/9639528.html
Copyright © 2011-2022 走看看