zoukankan      html  css  js  c++  java
  • rabbitmq 对多服务器p2p模式配置的一个测试

    一直对rabbitmq p2p 模式的多服务器下做相同配置的 各个服务器数据接受情况比较好奇

    今天有空测试了下 

    xml 文件

    [html] view plain copy
     
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
    4.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
    5.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    6.      http://www.springframework.org/schema/beans  
    7.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    8.      http://www.springframework.org/schema/rabbit  
    9.      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">  
    10.        
    11.     <!--配置connection-factory,指定连接rabbit server参数 -->  
    12.     <rabbit:connection-factory id="connectionFactory"  
    13.          port="5672"  username="guest" password="guest" host="127.0.0.1"  
    14.         />  
    15.       
    16.       
    17.         <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->  
    18.     <rabbit:admin connection-factory="connectionFactory" />  
    19.       
    20.     <!--定义queue -->  
    21.     <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />  
    22.       
    23.         <!-- 定义direct exchange,绑定queueTest -->  
    24.     <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">  
    25.         <rabbit:bindings>  
    26.             <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>  
    27.         </rabbit:bindings>  
    28.     </rabbit:direct-exchange>  
    29.       
    30.     <bean id="jsonMessageConverter"    
    31.         class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>  
    32.       
    33.     <!--定义rabbit template用于数据的接收和发送 -->  
    34.     <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"   
    35.         exchange="exchangeTest"   message-converter="jsonMessageConverter" />  
    36.           
    37.         
    38.     <!-- 消息接收者 -->  
    39.     <bean id="messageReceiver" class="com.bimatrix.revit.mq.MessageConsumer"></bean>  
    40.       
    41.     <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  
    42.     <rabbit:listener-container connection-factory="connectionFactory">  
    43.              <rabbit:listener queues="queueTest" ref="messageReceiver"/>  
    44.     </rabbit:listener-container>  
    45.       
    46. </beans>  
    [html] view plain copy
     
    1. </pre><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_4_8975174" name="code" class="html">  
    [html] view plain copy
     
    1. </pre><p></p><p>消费者:</p><p></p><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_5_4133388" name="code" class="java">@Component  
    2. public class MessageConsumer implements MessageListener {  
    3.       
    4.     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
    5.     int receiveNum =0 ;  
    6.   
    7.   
    8.     @Override  
    9.     public void onMessage(Message message) {  
    10.         receiveNum++ ;  
    11.         //logger.info("receive message:{}",message);  
    12. //      System.out.println("receive message:{}"+message);  
    13.         System.out.println("receive: "+receiveNum);  
    14.     }  
    15.   
    16.   
    17. }  


    测试代码:

    [java] view plain copy
     
    1. @RunWith(value = SpringJUnit4ClassRunner.class)  
    2. @ContextConfiguration(locations = { "classpath*:config/applicationContext.xml" })  
    3. public class TestQueue {  
    4.     @Autowired  
    5.     AmqpTemplate amqpTemplate;  
    6.   
    7.   
    8.     final String queue_key = "queueTestKey";  
    9.   
    10.   
    11.     @Test  
    12.     public void send() {  
    13.         try {  
    14.             //System.out.println("-----------------------------------------");  
    15.             for(int i=1;i<=1000;i++){  
    16.                 amqpTemplate.convertAndSend(queue_key, i);  
    17.                 //System.out.println(i);  
    18.             }  
    19.             //System.out.println("-----------------------------------------");  
    20.         } catch (Exception e) {  
    21. //          LOGGER.error(e);  
    22.         }  
    23.     }  



    连续发送1000条数据给消费者,同一台机器8080 9080端口各启动一个tomcat 代码完全一样,

    接受情况

    A 机器 receive: 481

    receive: 57  不知道为什么分两次打出来 ,在两个输出界面一个tomcat console 一个是junit的测试输出console 

    B 机器 receive: 462 

    三者合计就是1000  再次测试总和也是对的

    说明两台服务器在p2p的配置下 各接受了一部分生产者发送来的数据

    再多服务器下配置p2p 相当于阻塞队列 多个线程同时处理,各吃掉一部分队列的数据,起到了分流的效果

    当然每个消费者那里还可以在起动线程池来处理各自接收的数据。 

    高并发情况下 这种配置也是可以减轻各个服务器压力

  • 相关阅读:
    maven工程的目录结构
    集合的区别
    名词解析
    1.(字符串)-判断字符串是否是子集字符串
    1.(字符串)-判断两字符串是否相等
    python max函数技巧
    1.(字符串)-子字符串位置查找
    numpy线性代数np.linalg
    Python图像库PIL 使用
    pyhthon-chr
  • 原文地址:https://www.cnblogs.com/zxtceq/p/8559028.html
Copyright © 2011-2022 走看看