zoukankan      html  css  js  c++  java
  • 如何在项目中引入MetaQ消息收发机制

    当需要异步发送和接收大量消息时,需要在Crystal项目中引入MetaQ消息收发机制。

    关于MetaQ使用的官方例子可参考:https://github.com/killme2008/Metamorphosis/wiki/%E7%AE%80%E5%8D%95%E4%BE%8B%E5%AD%90

    Crystal框架将MetaQ进行封装,简化MetaQ的使用,具体如下:

    消息生产端

    1. 引入crystal-metaq-producer项目最为依赖:

      <dependency>
          <groupId>com.gsoft.crystal</groupId>
          <artifactId>crystal-metaq-producer</artifactId>
      </dependency>
    2. 调用消息发送对象,发送指定消息:

      @Resource
      private MessageProducer mp;
      @Resource
      private MessageConsumer mc;
       
      @Test
      public void testProducer() {
          String topic = "test";
          final String msg = "test message !";
           
          mp.publish(topic);
          Message message = new Message(topic, msg.getBytes());
          try {
              mp.sendMessage(message);
          } catch (MetaClientException e) {
              e.printStackTrace();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }

      其中,topic必须是MetaQ服务器中定义的主题之一。

      在发送消息前,必须先mp.publish(topic),与指定主题关联。

    消息消费端

    1. 引入crystal-metaq-consumer项目最为依赖:

      <dependency>
          <groupId>com.gsoft.crystal</groupId>
          <artifactId>crystal-metaq-consumer</artifactId>
      </dependency>
    2. 调用消息消费对象,注册监听器:

      @Resource
      private MessageConsumer mc;
       
      @Test
      public void testProducer() {
          String topic = "test";
           
          try {
              mc.subscribe(topic, 1024*1024, new MessageListener() {
                   
                  @Override
                  public void recieveMessages(Message message) throws InterruptedException {
                      String str = new String(message.getData());
                      System.out.println("Recived Message: " + str);
                      Assert.assertEquals(msg, str);
                  }
                   
                  @Override
                  public Executor getExecutor() {
                      return null;
                  }
              });
              mc.completeSubscribe();
          } catch (MetaClientException e1) {
              e1.printStackTrace();
          }

      其中,mc.subscribe()方法可执行多次,最后需执行mc.completeSubscribe()方法。

      另,上述方法中的1024*1024参数为接收的消息内容最大字节数,可自行调整以优化性能(不了解具体如何优化情况下,建议不要调整)。

      监听器中的recieveMessages方法即为消息消费方法,getExecutor方法返回线程池的执行器,如返回null,则不采用线程池。

  • 相关阅读:
    Tomcat全攻略
    JAVA必备——13个核心规范
    利用Node.js实现模拟Session验证的登陆
    Android中关于JNI 的学习(六)JNI中注冊方法的实现
    pomelo源代码分析(一)
    怎样解决栈溢出
    String,StringBuffer与StringBuilder的差别??
    ERWin 7.1 和7.2 的官方FTP下载地址
    C/C++中各种类型int、long、double、char表示范围(最大最小值)
    下拉刷新,上拉装载许多其他ListView
  • 原文地址:https://www.cnblogs.com/jytx/p/5482023.html
Copyright © 2011-2022 走看看