zoukankan      html  css  js  c++  java
  • Rabbit简单队列模式


    1
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>com.kf</groupId> 4 <artifactId>rabbitMQ.demo</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 7 8 9 <dependencies> 10 <dependency> 11 <groupId>com.rabbitmq</groupId> 12 <artifactId>amqp-client</artifactId> 13 <version>3.6.5</version> 14 </dependency> 15 </dependencies> 16 17 18 19 </project>
     1 package com.kf.utils;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 
     9 /**
    10  * 获取rabbit
    11  * @author kf
    12  *
    13  */
    14 public class RabbitConnectionUtils {
    15     
    16     
    17     public static Connection getConnection() throws IOException, TimeoutException{
    18         ConnectionFactory factory = new ConnectionFactory();
    19         factory.setHost("127.0.0.1");
    20         factory.setUsername("admin");
    21         factory.setPassword("admin");
    22         //AMQP协议端口号
    23         factory.setPort(5672);
    24         factory.setVirtualHost("/kf");
    25         Connection newConnection = factory.newConnection();
    26         return newConnection;
    27     }
    28 
    29 }
     1 package com.kf.queueDemo.simpleQueue;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.kf.utils.RabbitConnectionUtils;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 
    10 /**
    11  * 简单队列
    12  * @author kf
    13  *
    14  */
    15 public class SimpleQueueProducer {
    16     
    17     //队列名称
    18     private static String QUEUENAME = "SIMPLEQUEUE";
    19     
    20     public static void main(String[] args) throws IOException, TimeoutException{
    21         Connection connection = RabbitConnectionUtils.getConnection();
    22         
    23         //创建通道
    24         Channel channel = connection.createChannel();
    25         
    26         //通道里放入队列
    27         /**
    28          * 第一个参数是  队列名称
    29          * 第二个参数指 要不要持久化
    30          */
    31         channel.queueDeclare(QUEUENAME, false, false, false, null);
    32         
    33 /*        //消息体
    34         String mes = "demo_message汉字";
    35         
    36         //发送消息
    37         *//**
    38          * 参数为  exchange, routingKey, props, body
    39          * exchange   交换机
    40          * routingKey 路由键
    41          * 
    42          * body 消息体
    43          *//*
    44         channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/
    45         
    46         /**
    47          * 集群环境下,多个消费者情况下。消费者默认采用均摊
    48          */
    49         for(int i=1; i<11; i++){
    50             String mes = "demo_message汉字"+i;
    51             System.out.println("发送消息"+mes);
    52             channel.basicPublish("", QUEUENAME, null, mes.getBytes());
    53         }
    54         
    55         
    56 //        System.out.println("发送消息"+mes);
    57         
    58         channel.close();
    59         connection.close();
    60     }
    61 
    62 }
     1 package com.kf.queueDemo.simpleQueue;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.kf.utils.RabbitConnectionUtils;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 /**
    14  * 简单队列消费者
    15  * @author kf
    16  *
    17  */
    18 public class SimpleConsumer {
    19     //队列名称
    20         private static String QUEUENAME = "SIMPLEQUEUE";
    21         
    22         public static void main(String[] args) throws IOException, TimeoutException{
    23             System.out.println("01开始接收消息");
    24             Connection connection = RabbitConnectionUtils.getConnection();
    25             
    26             //创建通道
    27             final Channel channel = connection.createChannel();
    28             
    29             //通道里放入队列
    30             /**
    31              * 第一个参数是  队列名称
    32              * 第二个参数指 要不要持久化
    33              */
    34             channel.queueDeclare(QUEUENAME, false, false, false, null);
    35             
    36             DefaultConsumer consumer = new DefaultConsumer(channel){
    37                 //监听队列
    38                     @Override
    39                     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    40                             byte[] body) throws IOException {
    41                         System.out.println("------------进入监听---------");
    42                         String s = new String(body, "utf-8");
    43                         System.out.println("获取到的消息是:"+s);
    44                         //手动应答。
    45                         /**
    46                          * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
    47                          */
    48                     //    channel.basicAck(envelope.getDeliveryTag(), false);
    49                     }
    50             };
    51             
    52             //设置应答模式
    53             /**
    54              * 参数:  对列名,是否自动签收,监听的类
    55              */
    56             System.out.println("获取消息的方法之前");
    57             channel.basicConsume(QUEUENAME, true, consumer);
    58             System.out.println("获取消息的方法之后");
    59             
    60         }
    61 
    62 
    63 }
  • 相关阅读:
    apache+tomcat分布式搭建
    maven 加入本地jar包
    一步一步搭建Jenkins环境
    缓存详解
    nginx配置文件注释
    mybatis like的用法
    Golang聊天室
    windows 10安装gensim、nltk
    理解矩阵乘法
    超赞的 Go 语言 INI 文件操作
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660546.html
Copyright © 2011-2022 走看看