zoukankan      html  css  js  c++  java
  • RabbitMQ连接池、生产者、消费者实例

    1、本文分享RabbitMQ的工具类,经过实际项目长期测试,在此分享给发家,各位大神有什么建议请指正 !!!

    2、下面是链接池主要代码:

     1 import java.util.HashMap;
     2 import java.util.Map;
     3 
     4 import org.apache.commons.lang3.StringUtils;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 /**
    12  * 获取RabbitMq连接
    13  * @author skyfeng
    14  */
    15 public class RabbitMqConnectFactory {
    16     static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class);
    17     /**
    18      * 缓存连接工厂,将建立的链接放入map缓存
    19      */
    20     private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>();
    21     /**
    22      * 根据rabbitMqName获取一个连接,使用完记得要自己关闭连接 conn.close()
    23      */
    24     public static Connection getConnection(String rabbitMqName) {
    25         if(StringUtils.isEmpty(rabbitMqName)){
    26             log.error("rabbitMqName不能为空!");
    27             throw new java.lang.NullPointerException("rabbitMqName为空");
    28         }
    29         if(connectionFactoryMap.get(rabbitMqName)==null){
    30             initConnectionFactory(rabbitMqName);
    31         }
    32         ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName);
    33         if(connectionFactory==null){
    34             log.info("没有找到对应的rabbitmq,name={}",rabbitMqName);
    35         }
    36         try {
    37             return connectionFactory.newConnection();
    38         }catch (Exception e) {
    39             log.error("创建rabbitmq连接异常!",e);
    40             return null;
    41         }
    42     }
    43     /**
    44      * 初始化一个连接工厂
    45      * @param rabbitMqName
    46      */
    47     private static void initConnectionFactory(String rabbitMqName){
    48         
    49         try {
    50             ConnectionFactory factory = new ConnectionFactory();
    51             //新增代码,如果连接断开会自动重连
    52             //factory.setAutomaticRecoveryEnabled(true);
    53             factory.setHost("127.0.0.1");
    54             factory.setPort(5672);
    55             //factory.setVirtualHost(vhost);
    56             factory.setUsername("test");
    57             factory.setPassword("test");
    58             connectionFactoryMap.put(rabbitMqName, factory);
    59         } catch (Exception e) {
    60             e.printStackTrace();
    61         }finally{
    62         }
    63     }
    64     
    65 }

     3、消费端的代码:

     1 import org.slf4j.Logger;
     2 import org.slf4j.LoggerFactory;
     3 
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.Consumer;
     7 
     8 /**
     9  * RabbitMQq客户端代码
    10  * @author skyfeng
    11  *
    12  */
    13 public class CustomerMqClient {
    14 
    15     final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class);
    16     private final static String RABBITMQ_NAME = "mq_name";
    17     private final static String EXCHANGE_NAME = "Exchange_name";
    18     private final static String QUEUE_NAME = "queue_name";
    19     private static Channel channel = null;
    20     private static Connection connection = null;
    21     
    22     /**
    23      * 初始化客户端代码
    24      */
    25     public static void initClient() {
    26         //重新链接时判断之前的channel是否关闭,没有关闭先关闭
    27         if(null != channel  && channel.isOpen()){
    28             try {
    29                 channel.close();
    30             } catch (Exception e) {
    31                 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e);
    32             }finally {
    33                 channel = null;
    34             }
    35         }
    36         //重新链接时判断之前的connection是否关闭,没有关闭先关闭
    37         if (null != connection && connection.isOpen()) {
    38             try {
    39                 connection.close();
    40             } catch (Exception e) {
    41                 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e);
    42             }finally{
    43                 connection = null;
    44             }
    45         }
    46         //从链接池中获取链接
    47         connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME);
    48         try {
    49             channel = connection.createChannel();
    50             channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    51             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    52             channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#号接收所有的数据
    53             Consumer consumer = new CustomerMqConsumer(channel);//具体的业务逻辑在CustomerMqConsumer中
    54             channel.basicConsume(QUEUE_NAME, false, consumer);
    55         } catch (Exception e) {
    56             log.error("CustomerMqClient mq client connection fail .....{}", e);
    57             //发生异常时,重连
    58             reConnect();
    59         }
    60     }
    61 
    62     // 异常时,重连的方法
    63     public static void reConnect() {
    64         log.error("等待5s后重连");
    65         try {
    66             Thread.sleep(5000);
    67         } catch (InterruptedException e) {
    68         }
    69         initClient();
    70     }
    71 
    72 }

    4、生产端代码:

     1 import org.apache.commons.lang3.StringUtils;
     2 import org.slf4j.Logger;
     3 import org.slf4j.LoggerFactory;
     4 
     5 import com.rabbitmq.client.AlreadyClosedException;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 /**
    10  * 把数据发送到rabbitmq的exchange,
    11  */
    12 public class SendToExchange {
    13     static Logger log = LoggerFactory.getLogger(SendToExchange.class);
    14     
    15     final static String TYPE = "topic";
    16     final static String CHARSET_UTF8 = "UTF-8";
    17     //MQ生产者exchange,把数据发给这个exchange
    18     final static String rabbitExchangeName = "ExchangeName";
    19     static boolean mqConnected = false;//mq当前处于连接状态
    20     
    21     static Channel channel=null;
    22     static{
    23         init();
    24     }
    25     public static void init(){
    26         log.info(" rabbit mq init begin...");
    27         try {
    28             //在mq连接中断后,发送程序判断已经断开,启动重连的时候会执行
    29             if(channel!=null){
    30                 try {
    31                     channel.close();
    32                 } catch (Exception e) {
    33                     log.error("关闭老channel 异常",e);
    34                 }finally{
    35                     channel = null;
    36                 }
    37             }
    38             Connection connection = RabbitMqConnectFactory.getConnection("connection");
    39             channel = connection.createChannel();
    40             /*
    41              *这里只定义exchange,因为每个业务模块都会从这里接入数据,所以不在这里定义队列
    42              *队列的定义在各个业务模块自己的消费端定义
    43              */
    44             channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null);
    45             log.info(" rabbit mq init OK");
    46             mqConnected = true;
    47         } catch (Exception e) {
    48             log.error("rabbitmq初始化错误",e);
    49             mqConnected = false;
    50         }
    51     }
    52     /**
    53      * 往rabbitmq发数据
    54      * @param message
    55      */
    56     public static void sendToRabbitMq(String message,String routingKey){
    57         try {
    58             if(StringUtils.isEmpty(message)){
    59                 log.debug("message is empty");
    60                 return;
    61             }
    62             channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8));
    63         }catch(AlreadyClosedException ex){
    64             log.error("往rabbitmq发数据报错,可能连接已关闭,尝试重连,data:",message,ex);
    65             init();
    66         }catch (Exception e) {
    67             log.error("往rabbitmq发数据报错,data:",message,e);
    68         }
    69     }
    70 }
  • 相关阅读:
    aspnet mvc 中 跨域请求的处理方法
    Aspnet Mvc 前后端分离项目手记(三)关于restful 风格Url设计
    Aspnet Mvc 前后端分离项目手记(二)关于token认证
    Aspnet Mvc 前后端分离项目手记(一) 关于跨域问题(还有前言)
    31 | 误删数据后除了跑路,还能怎么办?
    30 | 答疑文章(二):用动态的观点看加锁
    29 | 如何判断一个数据库是不是出问题了?
    28 | 读写分离有哪些坑?
    27 | 主库出问题了,从库怎么办?
    26 | 备库为什么会延迟好几个小时?
  • 原文地址:https://www.cnblogs.com/skyfeng/p/rabbitmq.html
Copyright © 2011-2022 走看看