1.maven 安装 wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo yum -y install apache-maven 2.install yum -y install ant 3.git 安装 yum install git #查看版本 git --version #显示 git version +版本号 表示成功 #配置 git 名称以及邮箱 git config --global user.name "Your Name" git config --global user.email "user@youremail"
3.rocketmq安装(注意这里版本是4.2.0)
cd /usr/local/rocketmq(没有则创建目录)
git clone -b develop https://github.com/apache/incubator-rocketmq.git
cd incubator-rocketmq
mvn -Prelease-all -DskipTests clean install -U
------------------
一段长时间的maven 依赖下载
-------------------
cd distribution/target/apache-rocketmq
2)配置文件
vim /etc/profile 添加

#apache rocket-mq
export ROCKETMQ_HOME=/usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
export NAMESRV_ADDR=自己服务器ip:9876
使profile 生效
source /etc/profile
进入到 /usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq/bin 目录下:添加权限
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
启动:
nohup mqnamesrv &
//查看启动日志 默认在bin 目录的nohup.log下
tail -f nohup.out显示如下信息 表示启动成功

3)由于自己的服务器使用的是阿里云的 2g 内存,启动那个 mqnamesrv后,启动mqbroker时候需要设置下内存大小,否则会报错
vim runserver.sh(因为mqbroker脚本里面调用了runserver.sh
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@) 修改下图:

之后 启动 mqbroker并将启动日志写入到指定位置.进入到target/bin目录
nohup mqbroker & >/var/log/mq.log
启动成功后,使用ps aux|grep rocketmq如下图
4)写测试用例
1.pom.xml引入rocket包(引入的是4.1的包,4.2的引入后无法使用)
<!--4.2无法使用 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0</version>
<type>pom</type>
</dependency>
<!--4.1的引用包-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
生产者main方法:
package cn.rocketmq; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; /** * Create by fan on 2018/4/16 */ public class TestProductRocketMq { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { final DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producerGroupName"); // defaultMQProducer.setVipChannelEnabled(false); defaultMQProducer.setNamesrvAddr("47.98.111.19:9876"); try { defaultMQProducer.start(); } catch (MQClientException e) { e.printStackTrace(); } Message message = new Message("testTopic","tagA","keyA","Hello RocketMq".getBytes()); for (int i = 0 ;i<100;i++){ if (i%2 == 0){ SendResult sendResult = defaultMQProducer.send(message); Thread.sleep(100); System.out.println("tags send result:" + sendResult); }else { message = new Message("testTopic","tagB","keyB","Hello RocketMq.I'm your user".getBytes()); SendResult sendResult = defaultMQProducer.send(message); Thread.sleep(100); System.out.println("tags send result:" + sendResult); } } Runtime.getRuntime().addShutdownHook(new Thread(() -> defaultMQProducer.shutdown())); System.exit(0); } }
消费者方法:
package cn.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * Create by fan on 2018/4/16 */ public class TestConsumeRocketMq { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("testProducerGroupName"); defaultMQPushConsumer.setNamesrvAddr("47.98.111.19:9876"); defaultMQPushConsumer.subscribe("testTopic","tagA || tagB"); defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println(Thread.currentThread().getName() + "Receive new message:" + list); MessageExt messageExt = list.get(0); System.out.println("messageExt:" + messageExt); if(messageExt!=null && "testTopic".equals(messageExt.getTopic())){ if("tagA".equals(messageExt.getTags())){ String mess = new String(messageExt.getBody()); System.out.println("mess tagA consume:" + mess); }else if("tagB".equals(messageExt.getTags())){ String mess = new String(messageExt.getBody()); System.out.println("mess tagB consume:" + mess); } } //回执确认消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); defaultMQPushConsumer.start(); System.out.println("Consume start."); } }
之后运行main方法报错:
RocketMq Exception "connect to <:10909> failed"
解决办法是:centos关闭了10909以及9876防火墙端口
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent