zoukankan      html  css  js  c++  java
  • Spring + RocketMQ使用

    本文所介绍环境为win7环境下运行, 从官方github中(https://github.com/alibaba/RocketMQ)下载RocketMQ-master.zip,版本为v3.5.8,解压并进入根目录,运行命令install.bat, 安装完成后进入目录 argetalibaba-rocketmq-brokeralibaba-rocketmqin,打开两个命令行窗口,分别使用以下命令启动rocketmq

    启动nameserver
    mqnamesrv.exe
    启动broker
    mqbroker -n 127.0.0.1:9876
    

    1、编写pom.xml,

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.hode</groupId>
    	<artifactId>rocketmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	
    	<properties>
    		<spring.version>4.3.2.RELEASE</spring.version>
    		<junit.version>4.12</junit.version>
    		<log4j.version>1.2.17</log4j.version>
    		<rocketmq.version>3.2.6</rocketmq.version>
    		<slf4j.version>1.7.12</slf4j.version>
    	</properties>
    	
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-core</artifactId>
    			<version>${spring.version}</version>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>${spring.version}</version>
    		</dependency>
    		
    		<dependency>
    			<groupId>log4j</groupId>
    			<artifactId>log4j</artifactId>
    			<version>${log4j.version}</version>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    		
    		<dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>${rocketmq.version}</version>
            </dependency>
    		
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>${junit.version}</version>
    			<scope>test</scope>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-test</artifactId>
    			<version>${spring.version}</version>
    			<scope>test</scope>
    		</dependency>
    		
    	</dependencies>
    	
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.1</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    					<encoding>UTF-8</encoding>
    				</configuration>
    			</plugin>
    			
    		</plugins>
    	</build>
    	
    </project>
    

    2、编写spring配置文件applicationContext-consumer.xml,applicationContext-producer.xml以及log4j.properties,内容如下

    applicationContext-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
    
    	<bean id="producer" class="com.hode.rocketmq.Consumer" init-method="init" destroy-method="destroy">
    		<constructor-arg name="consumerGroup" value="rocketmq-test" />
    		<constructor-arg name="namesrvAddr" value="127.0.0.1:9876" />
    		<constructor-arg name="instanceName" value="test" />
    		<constructor-arg name="topic" value="testTopic" />
    		<constructor-arg name="messageListener" ref="messageListener" />
    	</bean>
    	
    	<bean id="messageListener" class="com.hode.rocketmq.StringMessageListener" />
    	
    </beans>
    

    applicationContext-producer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
    
    	<bean id="producer" class="com.hode.rocketmq.Producer" init-method="init" destroy-method="destroy">
    		<constructor-arg name="producerGroup" value="rocketmq-test" />
    		<constructor-arg name="namesrvAddr" value="127.0.0.1:9876" />
    		<constructor-arg name="instanceName" value="test" />
    	</bean>
    	
    </beans>
    

    log4j.properties

    log4j.rootLogger=INFO,Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%-4r %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] %-5p %c %x - %m%n
    
    log4j.logger.com.hode=DEBUG
    

    3、编写类StringMessageListener.java,Producer.java,Consumer.java

    StringMessageListener.java

    import java.util.List;
    
    import org.apache.log4j.Logger;
    
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class StringMessageListener implements MessageListenerConcurrently{
    
    	private Logger log = Logger.getLogger(getClass());
    	
    	@Override
    	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
    		for (MessageExt msg : msgs) {
                log.info("msg : " + new String(msg.getBody()));
            }
    		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    	}
    
    }
    

    Producer.java

    import org.apache.log4j.Logger;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    
    /**
     * 生产
     */
    public class Producer {
    	
    	protected Logger log = Logger.getLogger(getClass());
    	
    	private String producerGroup;
    	
    	private String namesrvAddr;
    	
    	private String instanceName;
    	
    	private DefaultMQProducer producer;
    	
    	public DefaultMQProducer getProducer() {
    		return producer;
    	}
    
    	public Producer(String producerGroup,String namesrvAddr,String instanceName){
    		this.producerGroup = producerGroup;
    		this.namesrvAddr = namesrvAddr;
    		this.instanceName = instanceName;
    	}
    	
    	public void init() throws MQClientException{
    		log.info("start init DefaultMQProducer...");
    		producer = new DefaultMQProducer(producerGroup);
    		producer.setNamesrvAddr(namesrvAddr);
    		producer.setInstanceName(instanceName);
    		producer.start();
    		log.info("DefaultMQProducer init success.");
    	}
    	
    	public void destroy(){
    		log.info("start destroy DefaultMQProducer...");
    		producer.shutdown();
    		log.info("DefaultMQProducer destroy success.");
    	}
    
    }
    

    Consumer.java

    import org.apache.log4j.Logger;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    
    public class Consumer {
    
    	private Logger log = Logger.getLogger(getClass());
    	
    	private DefaultMQPushConsumer consumer;
    	
    	private String consumerGroup;
    	
    	private String namesrvAddr;
    	
    	private String instanceName;
    	
    	private String topic;
    	
    	private MessageListenerConcurrently messageListener;
    	
    	public Consumer(String consumerGroup,String namesrvAddr,String instanceName,String topic,MessageListenerConcurrently messageListener){
    		this.consumerGroup = consumerGroup;
    		this.namesrvAddr = namesrvAddr;
    		this.instanceName = instanceName;
    		this.topic = topic;
    		this.messageListener = messageListener;
    	}
    	
    	public void init() throws Exception{
    		log.info("start init DefaultMQPushConsumer...");
    		consumer = new DefaultMQPushConsumer(consumerGroup);
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //从队列头部开始消费
    		consumer.setNamesrvAddr(namesrvAddr);
    		consumer.setInstanceName(instanceName);
    		consumer.subscribe(topic, "*");
    		consumer.registerMessageListener(messageListener);
    		consumer.start();
    		log.info("DefaultMQPushConsumer init ok.");
    	}
    
    	public void destroy(){
    		log.info("start destroy DefaultMQPushConsumer...");
    		consumer.shutdown();
    		log.info("DefaultMQPushConsumer destroy success.");
    	}
    	
    	public DefaultMQPushConsumer getConsumer() {
    		return consumer;
    	}
    
    }
    

    4、编写测试类

    ProducerTest.java

    import java.util.Date;
    
    import org.apache.log4j.Logger;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class ProducerTest {
    
    	private static Logger log = Logger.getLogger(ProducerTest.class);
    	
    	private static ApplicationContext context;
    	
    	public static void main(String[] args) throws Exception{
    		context = new ClassPathXmlApplicationContext("classpath:applicationContext-producer.xml");
    		Producer producer = context.getBean(Producer.class);
    		DefaultMQProducer p = producer.getProducer();
    		
    		String message = "test messgae"+new Date();
    		Message msg = new Message("testTopic",message.getBytes());
    		log.info(message);
    		p.send(msg, new SendCallback(){
    
    			@Override
    			public void onSuccess(SendResult sendResult) {
    				log.info(sendResult.getSendStatus().name());
    				log.info("onSuccess");
    				
    				producer.destroy();
    			}
    
    			@Override
    			public void onException(Throwable e) {
    				log.error("onException");
    			}
    		});
    		
    	}
    	
    }
    

    ConsumerTest.java

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class ConsumerTest {
    
    	private static ApplicationContext context;
    		
    	public static void main(String[] args) throws Exception{
    		context = new ClassPathXmlApplicationContext("classpath:applicationContext-consumer.xml");
    		Consumer consumer = context.getBean(Consumer.class);
    		
    		Thread.sleep(20*1000);
    		
    		System.out.println("end");
    		consumer.destroy();
    	}
    	
    }
    

    分别运行生产端及消费端完成测试,结束。

  • 相关阅读:
    springmvc最简单的搭建,初学者必看
    搭建服务器需要的那些
    jaxb使用
    Memcached Java Client API详解
    memcached client --ref
    使用Dom4j解析XML
    架构整洁之道
    架构的整理
    VMware虚拟机的三种联网方法及原理
    软件开发进度管理
  • 原文地址:https://www.cnblogs.com/alterem/p/11249001.html
Copyright © 2011-2022 走看看