zoukankan      html  css  js  c++  java
  • 【转载】java实现rabbitmq消息的发送接受

    原文地址:http://blog.csdn.net/sdyy321/article/details/9241445

    本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

    本文是基于spring-rabbit中间件来实现消息的发送接受功能

    see http://www.rabbitmq.com/tutorials/tutorial-one-Java.html

    see http://www.springsource.org/spring-amqp

    [html] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. <!-- for rabbitmq -->  
    2.     <dependency>  
    3.         <groupId>com.rabbitmq</groupId>  
    4.         <artifactId>amqp-client</artifactId>  
    5.         <version>2.8.2</version>  
    6.     </dependency>  
    7.     <dependency>  
    8.         <groupId>org.springframework.amqp</groupId>  
    9.         <artifactId>spring-amqp</artifactId>  
    10.         <version>1.1.1.RELEASE</version>  
    11.     </dependency>  
    12.     <dependency>  
    13.         <groupId>org.springframework.amqp</groupId>  
    14.         <artifactId>spring-rabbit</artifactId>  
    15.         <version>1.1.1.RELEASE</version>  
    16.     </dependency>  
    17.     <dependency>  
    18.         <groupId>com.caucho</groupId>  
    19.         <artifactId>hessian</artifactId>  
    20.         <version>4.0.7</version>  
    21.     </dependency>  
    22.   </dependencies>  

    首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class EventMessage implements Serializable{  
    2.   
    3.     private String queueName;  
    4.       
    5.     private String exchangeName;  
    6.       
    7.     private byte[] eventData;  
    8.   
    9.     public EventMessage(String queueName, String exchangeName, byte[] eventData) {  
    10.         this.queueName = queueName;  
    11.         this.exchangeName = exchangeName;  
    12.         this.eventData = eventData;  
    13.     }  
    14.   
    15.     public EventMessage() {  
    16.     }     
    17.   
    18.     public String getQueueName() {  
    19.         return queueName;  
    20.     }  
    21.   
    22.     public String getExchangeName() {  
    23.         return exchangeName;  
    24.     }  
    25.   
    26.     public byte[] getEventData() {  
    27.         return eventData;  
    28.     }  
    29.   
    30.     @Override  
    31.     public String toString() {  
    32.         return "EopEventMessage [queueName=" + queueName + ", exchangeName="  
    33.                 + exchangeName + ", eventData=" + Arrays.toString(eventData)  
    34.                 + "]";  
    35.     }  
    36. }  

    为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public interface CodecFactory {  
    2.   
    3.     byte[] serialize(Object obj) throws IOException;  
    4.       
    5.     Object deSerialize(byte[] in) throws IOException;  
    6.   
    7. }  

    下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class HessionCodecFactory implements CodecFactory {  
    2.   
    3.     private final Logger logger = Logger.getLogger(HessionCodecFactory.class);  
    4.   
    5.     @Override  
    6.     public byte[] serialize(Object obj) throws IOException {  
    7.         ByteArrayOutputStream baos = null;  
    8.         HessianOutput output = null;  
    9.         try {  
    10.             baos = new ByteArrayOutputStream(1024);  
    11.             output = new HessianOutput(baos);  
    12.             output.startCall();  
    13.             output.writeObject(obj);  
    14.             output.completeCall();  
    15.         } catch (final IOException ex) {  
    16.             throw ex;  
    17.         } finally {  
    18.             if (output != null) {  
    19.                 try {  
    20.                     baos.close();  
    21.                 } catch (final IOException ex) {  
    22.                     this.logger.error("Failed to close stream.", ex);  
    23.                 }  
    24.             }  
    25.         }  
    26.         return baos != null ? baos.toByteArray() : null;  
    27.     }  
    28.   
    29.     @Override  
    30.     public Object deSerialize(byte[] in) throws IOException {  
    31.         Object obj = null;  
    32.         ByteArrayInputStream bais = null;  
    33.         HessianInput input = null;  
    34.         try {  
    35.             bais = new ByteArrayInputStream(in);  
    36.             input = new HessianInput(bais);  
    37.             input.startReply();  
    38.             obj = input.readObject();  
    39.             input.completeReply();  
    40.         } catch (final IOException ex) {  
    41.             throw ex;  
    42.         } catch (final Throwable e) {  
    43.             this.logger.error("Failed to decode object.", e);  
    44.         } finally {  
    45.             if (input != null) {  
    46.                 try {  
    47.                     bais.close();  
    48.                 } catch (final IOException ex) {  
    49.                     this.logger.error("Failed to close stream.", ex);  
    50.                 }  
    51.             }  
    52.         }  
    53.         return obj;  
    54.     }  
    55.   
    56. }  

    接下来就先实现发送功能,新增一个接口专门用来实现发送功能

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public interface EventTemplate {  
    2.   
    3.     void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;  
    4.           
    5.     void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;  
    6. }  

    SendRefuseException是自定义的发送失败异常类

    下面是它的实现类,主要的任务就是将数据转换为EventMessage

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class DefaultEventTemplate implements EventTemplate {  
    2.   
    3.     private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);  
    4.   
    5.     private AmqpTemplate eventAmqpTemplate;  
    6.   
    7.     private CodecFactory defaultCodecFactory;  
    8.   
    9. //  private DefaultEventController eec;  
    10. //  
    11. //  public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,  
    12. //          CodecFactory defaultCodecFactory, DefaultEventController eec) {  
    13. //      this.eventAmqpTemplate = eopAmqpTemplate;  
    14. //      this.defaultCodecFactory = defaultCodecFactory;  
    15. //      this.eec = eec;  
    16. //  }  
    17.       
    18.     public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {  
    19.         this.eventAmqpTemplate = eopAmqpTemplate;  
    20.         this.defaultCodecFactory = defaultCodecFactory;  
    21.     }  
    22.   
    23.     @Override  
    24.     public void send(String queueName, String exchangeName, Object eventContent)  
    25.             throws SendRefuseException {  
    26.         this.send(queueName, exchangeName, eventContent, defaultCodecFactory);  
    27.     }    
    28.   
    29.     @Override  
    30.     public void send(String queueName, String exchangeName, Object eventContent,  
    31.             CodecFactory codecFactory) throws SendRefuseException {  
    32.         if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {  
    33.             throw new SendRefuseException("queueName exchangeName can not be empty.");  
    34.         }  
    35.           
    36. //      if (!eec.beBinded(exchangeName, queueName))  
    37. //          eec.declareBinding(exchangeName, queueName);  
    38.   
    39.         byte[] eventContentBytes = null;  
    40.         if (codecFactory == null) {  
    41.             if (eventContent == null) {  
    42.                 logger.warn("Find eventContent is null,are you sure...");  
    43.             } else {  
    44.                 throw new SendRefuseException(  
    45.                         "codecFactory must not be null ,unless eventContent is null");  
    46.             }  
    47.         } else {  
    48.             try {  
    49.                 eventContentBytes = codecFactory.serialize(eventContent);  
    50.             } catch (IOException e) {  
    51.                 throw new SendRefuseException(e);  
    52.             }  
    53.         }  
    54.   
    55.         // 构造成Message  
    56.         EventMessage msg = new EventMessage(queueName, exchangeName,  
    57.                 eventContentBytes);  
    58.         try {  
    59.             eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);  
    60.         } catch (AmqpException e) {  
    61.             logger.error("send event fail. Event Message : [" + eventContent + "]", e);  
    62.             throw new SendRefuseException("send event fail", e);  
    63.         }  
    64.     }  
    65. }  

    注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明

    然后我们再实现接受消息

    首先我们需要一个消费接口,所有的消费程序都实现这个类

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public interface EventProcesser {  
    2.     public void process(Object e);  
    3. }  

    为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * MessageListenerAdapter的Pojo 
    3.  * <p>消息处理适配器,主要功能:</p> 
    4.  * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p> 
    5.  * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p> 
    6.  *  
    7.  */  
    8. public class MessageAdapterHandler {  
    9.   
    10.     private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);  
    11.   
    12.     private ConcurrentMap<String, EventProcessorWrap> epwMap;  
    13.   
    14.     public MessageAdapterHandler() {  
    15.         this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();  
    16.     }  
    17.   
    18.     public void handleMessage(EventMessage eem) {  
    19.         logger.debug("Receive an EventMessage: [" + eem + "]");  
    20.         // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值  
    21.         if (eem == null) {  
    22.             logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");  
    23.             return;  
    24.         }  
    25.         if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {  
    26.             logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");  
    27.             return;  
    28.         }  
    29.         // 解码,并交给对应的EventHandle执行  
    30.         EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());  
    31.         if (eepw == null) {  
    32.             logger.warn("Receive an EopEventMessage, but no processor can do it.");  
    33.             return;  
    34.         }  
    35.         try {  
    36.             eepw.process(eem.getEventData());  
    37.         } catch (IOException e) {  
    38.             logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);  
    39.             return;  
    40.         }  
    41.     }  
    42.   
    43.     protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {  
    44.         if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {  
    45.             throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");  
    46.         }  
    47.         EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);  
    48.         EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);  
    49.         if (oldProcessorWrap != null) {  
    50.             logger.warn("The processor of this queue and exchange exists, and the new one can't be add");  
    51.         }  
    52.     }  
    53.   
    54.     protected Set<String> getAllBinding() {  
    55.         Set<String> keySet = epwMap.keySet();  
    56.         return keySet;  
    57.     }  
    58.   
    59.     protected static class EventProcessorWrap {  
    60.   
    61.         private CodecFactory codecFactory;  
    62.   
    63.         private EventProcesser eep;  
    64.   
    65.         protected EventProcessorWrap(CodecFactory codecFactory,  
    66.                 EventProcesser eep) {  
    67.             this.codecFactory = codecFactory;  
    68.             this.eep = eep;  
    69.         }  
    70.   
    71.         public void process(byte[] eventData) throws IOException{  
    72.             Object obj = codecFactory.deSerialize(eventData);  
    73.             eep.process(obj);  
    74.         }  
    75.     }  
    76. }  

    这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class MessageErrorHandler implements ErrorHandler{  
    2.   
    3.     private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);  
    4.       
    5.     @Override  
    6.     public void handleError(Throwable t) {  
    7.         logger.error("RabbitMQ happen a error:" + t.getMessage(), t);  
    8.     }  
    9.   
    10. }  

    接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class EventControlConfig {  
    2.   
    3.     private final static int DEFAULT_PORT = 5672;  
    4.       
    5.     private final static String DEFAULT_USERNAME = "guest";  
    6.       
    7.     private final static String DEFAULT_PASSWORD = "guest";  
    8.       
    9.     private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;  
    10.       
    11.     private static final int PREFETCH_SIZE = 1;  
    12.       
    13.     private String serverHost ;  
    14.       
    15.     private int port = DEFAULT_PORT;  
    16.       
    17.     private String username = DEFAULT_USERNAME;  
    18.       
    19.     private String password = DEFAULT_PASSWORD;  
    20.       
    21.     private String virtualHost;  
    22.       
    23.     /** 
    24.      * 和rabbitmq建立连接的超时时间 
    25.      */  
    26.     private int connectionTimeout = 0;  
    27.       
    28.     /** 
    29.      * 事件消息处理线程数,默认是 CPU核数 * 2 
    30.      */  
    31.     private int eventMsgProcessNum;  
    32.       
    33.     /** 
    34.      * 每次消费消息的预取值 
    35.      */  
    36.     private int prefetchSize;  
    37.       
    38.     public EventControlConfig(String serverHost) {  
    39.         this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());  
    40.     }  
    41.   
    42.     public EventControlConfig(String serverHost, int port, String username,  
    43.             String password, String virtualHost, int connectionTimeout,  
    44.             int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {  
    45.         this.serverHost = serverHost;  
    46.         this.port = port>0?port:DEFAULT_PORT;  
    47.         this.username = username;  
    48.         this.password = password;  
    49.         this.virtualHost = virtualHost;  
    50.         this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;  
    51.         this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;  
    52.         this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;  
    53.     }  
    54.   
    55.     public String getServerHost() {  
    56.         return serverHost;  
    57.     }  
    58.   
    59.     public int getPort() {  
    60.         return port;  
    61.     }  
    62.   
    63.     public String getUsername() {  
    64.         return username;  
    65.     }  
    66.   
    67.     public String getPassword() {  
    68.         return password;  
    69.     }  
    70.   
    71.     public String getVirtualHost() {  
    72.         return virtualHost;  
    73.     }  
    74.   
    75.     public int getConnectionTimeout() {  
    76.         return connectionTimeout;  
    77.     }  
    78.   
    79.     public int getEventMsgProcessNum() {  
    80.         return eventMsgProcessNum;  
    81.     }  
    82.   
    83.     public int getPrefetchSize() {  
    84.         return prefetchSize;  
    85.     }  
    86.   
    87. }  


    具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public interface EventController {  
    2.       
    3.     /** 
    4.      * 控制器启动方法 
    5.      */  
    6.     void start();  
    7.       
    8.     /** 
    9.      * 获取发送模版 
    10.      */  
    11.     EventTemplate getEopEventTemplate();  
    12.       
    13.     /** 
    14.      * 绑定消费程序到对应的exchange和queue 
    15.      */  
    16.     EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);  
    17.       
    18.     /*in map, the key is queue name, but value is exchange name*/  
    19.     EventController add(Map<String,String> bindings, EventProcesser eventProcesser);  
    20.       
    21. }  

    它的实现类如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * 和rabbitmq通信的控制器,主要负责: 
    3.  * <p>1、和rabbitmq建立连接</p> 
    4.  * <p>2、声明exChange和queue以及它们的绑定关系</p> 
    5.  * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p> 
    6.  * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p> 
    7.  * @author yangyong 
    8.  * 
    9.  */  
    10. public class DefaultEventController implements EventController {  
    11.       
    12.     private CachingConnectionFactory rabbitConnectionFactory;  
    13.       
    14.     private EventControlConfig config;  
    15.       
    16.     private RabbitAdmin rabbitAdmin;  
    17.       
    18.     private CodecFactory defaultCodecFactory = new HessionCodecFactory();  
    19.       
    20.     private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container  
    21.       
    22.     private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();  
    23.       
    24.     private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定  
    25.     //queue cache, key is exchangeName  
    26.     private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();  
    27.     //queue cache, key is queueName  
    28.     private Map<String, Queue> queues = new HashMap<String, Queue>();  
    29.     //bind relation of queue to exchange cache, value is exchangeName | queueName  
    30.     private Set<String> binded = new HashSet<String>();  
    31.       
    32.     private EventTemplate eventTemplate; // 给App使用的Event发送客户端  
    33.       
    34.     private AtomicBoolean isStarted = new AtomicBoolean(false);  
    35.       
    36.     private static DefaultEventController defaultEventController;  
    37.       
    38.     public synchronized static DefaultEventController getInstance(EventControlConfig config){  
    39.         if(defaultEventController==null){  
    40.             defaultEventController = new DefaultEventController(config);  
    41.         }  
    42.         return defaultEventController;  
    43.     }  
    44.       
    45.     private DefaultEventController(EventControlConfig config){  
    46.         if (config == null) {  
    47.             throw new IllegalArgumentException("Config can not be null.");  
    48.         }  
    49.         this.config = config;  
    50.         initRabbitConnectionFactory();  
    51.         // 初始化AmqpAdmin  
    52.         rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);  
    53.         // 初始化RabbitTemplate  
    54.         RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);  
    55.         rabbitTemplate.setMessageConverter(serializerMessageConverter);  
    56.         eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);  
    57.     }  
    58.       
    59.     /** 
    60.      * 初始化rabbitmq连接 
    61.      */  
    62.     private void initRabbitConnectionFactory() {  
    63.         rabbitConnectionFactory = new CachingConnectionFactory();  
    64.         rabbitConnectionFactory.setHost(config.getServerHost());  
    65.         rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());  
    66.         rabbitConnectionFactory.setPort(config.getPort());  
    67.         rabbitConnectionFactory.setUsername(config.getUsername());  
    68.         rabbitConnectionFactory.setPassword(config.getPassword());  
    69.         if (!StringUtils.isEmpty(config.getVirtualHost())) {  
    70.             rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());  
    71.         }  
    72.     }  
    73.       
    74.     /** 
    75.      * 注销程序 
    76.      */  
    77.     public synchronized void destroy() throws Exception {  
    78.         if (!isStarted.get()) {  
    79.             return;  
    80.         }  
    81.         msgListenerContainer.stop();  
    82.         eventTemplate = null;  
    83.         rabbitAdmin = null;  
    84.         rabbitConnectionFactory.destroy();  
    85.     }  
    86.       
    87.     @Override  
    88.     public void start() {  
    89.         if (isStarted.get()) {  
    90.             return;  
    91.         }  
    92.         Set<String> mapping = msgAdapterHandler.getAllBinding();  
    93.         for (String relation : mapping) {  
    94.             String[] relaArr = relation.split("\|");  
    95.             declareBinding(relaArr[1], relaArr[0]);  
    96.         }  
    97.         initMsgListenerAdapter();  
    98.         isStarted.set(true);  
    99.     }  
    100.       
    101.     /** 
    102.      * 初始化消息监听器容器 
    103.      */  
    104.     private void initMsgListenerAdapter(){  
    105.         MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);  
    106.         msgListenerContainer = new SimpleMessageListenerContainer();  
    107.         msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);  
    108.         msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);  
    109.         msgListenerContainer.setMessageListener(listener);  
    110.         msgListenerContainer.setErrorHandler(new MessageErrorHandler());  
    111.         msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值  
    112.         msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());  
    113.         msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数  
    114.         msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));  
    115.         msgListenerContainer.start();  
    116.     }  
    117.   
    118.     @Override  
    119.     public EventTemplate getEopEventTemplate() {  
    120.         return eventTemplate;  
    121.     }  
    122.   
    123.     @Override  
    124.     public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {  
    125.         return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);  
    126.     }  
    127.       
    128.     public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {  
    129.         msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);  
    130.         if(isStarted.get()){  
    131.             initMsgListenerAdapter();  
    132.         }  
    133.         return this;  
    134.     }  
    135.   
    136.     @Override  
    137.     public EventController add(Map<String, String> bindings,  
    138.             EventProcesser eventProcesser) {  
    139.         return add(bindings, eventProcesser,defaultCodecFactory);  
    140.     }  
    141.   
    142.     public EventController add(Map<String, String> bindings,  
    143.             EventProcesser eventProcesser, CodecFactory codecFactory) {  
    144.         for(Map.Entry<String, String> item: bindings.entrySet())   
    145.             msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);  
    146.         return this;  
    147.     }  
    148.       
    149.     /** 
    150.      * exchange和queue是否已经绑定 
    151.      */  
    152.     protected boolean beBinded(String exchangeName, String queueName) {  
    153.         return binded.contains(exchangeName+"|"+queueName);  
    154.     }  
    155.       
    156.     /** 
    157.      * 声明exchange和queue已经它们的绑定关系 
    158.      */  
    159.     protected synchronized void declareBinding(String exchangeName, String queueName) {  
    160.         String bindRelation = exchangeName+"|"+queueName;  
    161.         if (binded.contains(bindRelation)) return;  
    162.           
    163.         boolean needBinding = false;  
    164.         DirectExchange directExchange = exchanges.get(exchangeName);  
    165.         if(directExchange == null) {  
    166.             directExchange = new DirectExchange(exchangeName, true, false, null);  
    167.             exchanges.put(exchangeName, directExchange);  
    168.             rabbitAdmin.declareExchange(directExchange);//声明exchange  
    169.             needBinding = true;  
    170.         }  
    171.           
    172.         Queue queue = queues.get(queueName);  
    173.         if(queue == null) {  
    174.             queue = new Queue(queueName, true, false, false);  
    175.             queues.put(queueName, queue);  
    176.             rabbitAdmin.declareQueue(queue);    //声明queue  
    177.             needBinding = true;  
    178.         }  
    179.           
    180.         if(needBinding) {  
    181.             Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange  
    182.             rabbitAdmin.declareBinding(binding);//声明绑定关系  
    183.             binded.add(bindRelation);  
    184.         }  
    185.     }  
    186.   
    187. }  

    搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @SuppressWarnings("serial")  
    2. public class People implements Serializable{  
    3.     private int id;  
    4.     private String name;  
    5.     private boolean male;  
    6.     private People spouse;  
    7.     private List<People> friends;  
    8.     public int getId() {  
    9.         return id;  
    10.     }  
    11.     public void setId(int id) {  
    12.         this.id = id;  
    13.     }  
    14.     public String getName() {  
    15.         return name;  
    16.     }  
    17.     public void setName(String name) {  
    18.         this.name = name;  
    19.     }  
    20.     public boolean isMale() {  
    21.         return male;  
    22.     }  
    23.     public void setMale(boolean male) {  
    24.         this.male = male;  
    25.     }  
    26.     public People getSpouse() {  
    27.         return spouse;  
    28.     }  
    29.     public void setSpouse(People spouse) {  
    30.         this.spouse = spouse;  
    31.     }  
    32.     public List<People> getFriends() {  
    33.         return friends;  
    34.     }  
    35.     public void setFriends(List<People> friends) {  
    36.         this.friends = friends;  
    37.     }  
    38.       
    39.     @Override  
    40.     public String toString() {  
    41.         // TODO Auto-generated method stub  
    42.         return "People[id="+id+",name="+name+",male="+male+"]";  
    43.     }  
    44. }  

    建立单元测试


     

     在CODE上查看代码片派生到我的代码片
    1. public class RabbitMqTest{  
    2.       
    3.     private String defaultHost = "127.0.0.1";  
    4.       
    5.     private String defaultExchange = "EXCHANGE_DIRECT_TEST";  
    6.       
    7.     private String defaultQueue = "QUEUE_TEST";  
    8.       
    9.     private DefaultEventController controller;  
    10.       
    11.     private EventTemplate eventTemplate;  
    12.       
    13.     @Before  
    14.     public void init() throws IOException{  
    15.         EventControlConfig config = new EventControlConfig(defaultHost);  
    16.         controller = DefaultEventController.getInstance(config);  
    17.         eventTemplate = controller.getEopEventTemplate();  
    18.         controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());  
    19.         controller.start();  
    20.     }  
    21.       
    22.     @Test  
    23.     public void sendString() throws SendRefuseException{  
    24.         eventTemplate.send(defaultQueue, defaultExchange, "hello world");  
    25.     }  
    26.       
    27.     @Test  
    28.     public void sendObject() throws SendRefuseException{  
    29.         eventTemplate.send(defaultQueue, defaultExchange, mockObj());  
    30.     }  
    31.       
    32.     @Test  
    33.     public void sendTemp() throws SendRefuseException, InterruptedException{  
    34.         String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange  
    35.         String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue  
    36.         eventTemplate.send(tempQueue, tempExchange, mockObj());  
    37.         //发送成功后此时不会接受到消息,还需要绑定对应的消费程序  
    38.         controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());  
    39.     }  
    40.       
    41.     @After  
    42.     public void end() throws InterruptedException{  
    43.         Thread.sleep(2000);  
    44.     }  
    45.       
    46.     private People mockObj(){  
    47.         People jack = new People();  
    48.         jack.setId(1);  
    49.         jack.setName("JACK");  
    50.         jack.setMale(true);  
    51.           
    52.         List<People> friends = new ArrayList<>();  
    53.         friends.add(jack);  
    54.         People hanMeiMei = new People();  
    55.         hanMeiMei.setId(1);  
    56.         hanMeiMei.setName("韩梅梅");  
    57.         hanMeiMei.setMale(false);  
    58.         hanMeiMei.setFriends(friends);  
    59.           
    60.         People liLei = new People();  
    61.         liLei.setId(2);  
    62.         liLei.setName("李雷");  
    63.         liLei.setMale(true);  
    64.         liLei.setFriends(friends);  
    65.         liLei.setSpouse(hanMeiMei);  
    66.         hanMeiMei.setSpouse(liLei);  
    67.         return hanMeiMei;  
    68.     }  
    69.       
    70.     class ApiProcessEventProcessor implements EventProcesser{  
    71.         @Override  
    72.         public void process(Object e) {//消费程序这里只是打印信息  
    73.             Assert.assertNotNull(e);  
    74.             System.out.println(e);  
    75.             if(e instanceof People){  
    76.                 People people = (People)e;  
    77.                 System.out.println(people.getSpouse());  
    78.                 System.out.println(people.getFriends());  
    79.             }  
    80.         }  
    81.     }  
    82. }  


    源码地址请点击这里

  • 相关阅读:
    今天到了1000分了,庆祝一下
    中文vs2008安装 mvc 1
    火车采集器使用感受
    存储过程中的case用法
    作为开发者的反思
    什么是程序员的优秀品质?
    遇到了乱码的问题(转载)
    利用网址导航站点推广
    国内优秀网址导航站总结 (转载)
    Unable to read local eventlog错误解决(转载)
  • 原文地址:https://www.cnblogs.com/xujishou/p/6773283.html
Copyright © 2011-2022 走看看