zoukankan      html  css  js  c++  java
  • 【rocketMQ】1、搭建MQ服务器,生产一个订单与消费一个订单

    1、 先解压

      

    2、 maven编译安装、(注意虚拟机采用nat网络模式,需要联网)

    mvn -Prelease-all -DskipTests clean install -U

     

     

    启动nameser节点

     

    启动broker

     

    nohup sh bin/mqbroker -n localhost:9876 & tail -f namesrv.log

    出错,

     

    修改内存配置

     

     

    修改为

     

    修改broken

     

     

     这里我吃了大亏,主机对虚拟机中的端口访问不通!!!

    注意一定要关闭防火墙,或者开启9876等需要使用的端口,不然无法远程调用!

    再次启动

    nohup bin/mqnamesrv > namesrv.log 2>&1 & tail -f namesrv.log

     

    nohup bin/mqbroker -n 127.0.0.1:9876 > broker.log 2>&1 & tail -f broker.log

     

    、、测试案例

     这个是官网的,其实这个无所谓,等会使用代码远程发送订单

    > export NAMESRV_ADDR=localhost:9876

     > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

    停止服务,这个也无所谓,实在不行直接 kill -9 pid  吧进程杀死也是可以的

     

    来,开始发送第一单!!!

    package tttt.mq;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.junit.Test;
    
    public class MqProductTest {
    
    	@Test
    	public void test1() {
    		DefaultMQProducer producer = new DefaultMQProducer("xiaof_test");
    		producer.setNamesrvAddr("192.168.0.128:9876");
    		try {
    			producer.start();
    			for (int i = 0; i < 2; i++)
    				try {
    					{
    						Message msg = new Message("Topic1", "TagA", "OrderID188",
    								"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    //						SendResult sendResult = producer.send(msg);
    //						System.out.printf("%s%n", sendResult);
    						
    						producer.sendOneway(msg);
    						
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    
    		} catch (MQClientException e) {
    			e.printStackTrace();
    		} finally {
    			producer.shutdown();
    		}
    	}
    
    }
    

      

    解压来,我们消费掉这个

    package tttt.mq;
    
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.junit.Test;
    
    public class MqConsumeTest {
    	
    	@Test
    	public void test1() {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaof_test");
    		consumer.setNamesrvAddr("192.168.0.128:9876");
    		
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		
    		try {
    			consumer.subscribe("Topic1", "TagA");
    			
    			consumer.registerMessageListener(new MessageListenerOrderly() {
    				
    				AtomicLong consumeTimes = new AtomicLong(0);
    				
    				@Override
    				public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    					
    					//这个要是false,服务器就会不断重复发送消息
    					context.setAutoCommit(true);
    					MessageExt msg = msgs.get(0);
    					String data = new String(msg.getBody());
    
    					System.out.printf("%s 消费信息线程与数据: %s %n", Thread.currentThread().getName(), data);
    					
    					this.consumeTimes.incrementAndGet();
    					if ((this.consumeTimes.get() % 2) == 0) {
    						return ConsumeOrderlyStatus.SUCCESS;
    					} else if ((this.consumeTimes.get() % 3) == 0) {
    						return ConsumeOrderlyStatus.ROLLBACK;
    					} else if ((this.consumeTimes.get() % 4) == 0) {
    						return ConsumeOrderlyStatus.COMMIT;
    					} else if ((this.consumeTimes.get() % 5) == 0) {
    						context.setSuspendCurrentQueueTimeMillis(3000);
    						return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    					}
    					return ConsumeOrderlyStatus.SUCCESS;
    
    				}
    			});
    			
    			consumer.start();
    
    	        System.out.printf("Consumer Started.%n");
    			
    		} catch (MQClientException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

      

    来一发效果:

     

     

     

     

     这个是消费msg中的全部信息:

  • 相关阅读:
    8天学通MongoDB(实际操作版)——第九天 构建学习型部署环境
    构建Ubuntu Server试验环境
    8天学通MongoDB(实际操作版)——第一天 基础入门
    《JAVA与模式》之适配器模式
    wsdl
    Axis2 Fault: Transport out has not been set
    诺基亚E63常见设置指南
    java面试题总结
    R400 安装XP后 ati2dvag蓝屏解决办法
    深入理解Java多态性
  • 原文地址:https://www.cnblogs.com/cutter-point/p/8325959.html
Copyright © 2011-2022 走看看