zoukankan      html  css  js  c++  java
  • Rocket MQ 问题排查命令

    修改rocketmq官方代码测试:

    package com.alibaba.middleware.race.rocketmq;
    import java.util.Scanner;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    /**
     * Producer,模拟rocket mq使用中可能出现的问题,学习如何排查q问题
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
            DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            final String topics = "TOPIC-IT-WORKER-TEST";
            for (int i = 0; i < 1000; i++) {
                @SuppressWarnings("resource")
                Scanner reader=new Scanner(System.in);
                int key = reader.nextInt();
                final String message = " order-message - " + i + " key: " + key;
                byte[] body = message.getBytes();
                Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body);
                producer.send(msgToBroker, new SendCallback() {
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(message);
                    }
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            }
        }
    }
    package com.alibaba.middleware.race.rocketmq;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import java.util.List;
    import java.util.Scanner;
    
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.subscribe("TOPIC-IT-WORKER-TEST", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        Scanner reader=new Scanner(System.in);
                        reader.hasNext();
                        byte[] body = msg.getBody();
                        if (body.length == 2 && body[0] == 0 && body[1] == 0) {
                            System.out.println("Got the end signal");
                            continue;
                        }
                        String paymentMessage = new String(body);
                        System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags());
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.alibaba.race</groupId>
        <artifactId>preliminary.demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>2.5.3</version>
                    <configuration>
                        <appendAssemblyId>false</appendAssemblyId>
                        <descriptors>
                            <descriptor>src/main/resources/assembly.xml</descriptor>
                        </descriptors>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>install</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    <!--    com.alibaba.middleware.race.jstorm-2.1.1版本默认的日志框架是logback,为了避免日志冲突,排除掉log4j-->
        <dependencies>
            <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>3.2.6</version>
            </dependency>
        </dependencies>
    </project>
    增加selector选择器,根据key选择进入的Broker队列
    producer.send(msgToBroker, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msgToBroker, Object arg) {
                        // 根据key来选择队列
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        for (MessageQueue mq : mqs) {
                            System.out.println("current queue: " + mq.getQueueId());
                        }
                        System.out.println("select id: " + index);
                        return mqs.get(index);
                    }
                }, key, new SendCallback() {
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(message);
                    }
    
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
    编译后启动服务端和客户端
    进入target目录
    启动生产者生产数据:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer
    启动消费者消费数据:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer

    启动&参数修改

    mqnamesrv   启动NameServer  jps - NamesrvStartup
    mqbroker -n localhost:9876 启动broker jps - BrokerStartup 默认端口10911
    mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker参数配置

    查看当前系统状态

    mqadmin clusterList -n 127.0.0.1:9876

    查看当前所有topicList/创建

    mqadmin topicList -n 127.0.0.1:9876
    mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t topic名称

    查看broker状态

    mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911

     查看某个topic的状态

    mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST
    当前可见,producer只发送了一条消息,Max offset为1,最后收到消息的时间是last updated,由于配置四个broker都是本机,只有第一个收到了当前第一条消息
    第二张图为发了四条消息之后的状态,看起来可能就是轮询的,因为当我增加4条key为1的msg之后,仍然是四个节点每个两条

    查看连接的procedure/consumer

    mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
    mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name

    查看某个key对应的msg

    mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1
    因为之前发送了5条key为1的数据,所以这里可以看到是5条,每条都有一个MESSAGE ID

    根据ID查看对应的MSG

    mqadmin queryMsgById  -g consumer_group_name -i AC1F78B700002A9F00000000000A3208  -n 127.0.0.1:9876

    根据位置偏移查询上面的那条数据

     mqadmin queryMsgByOffset -n  127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz

    查看消费详情

    mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name
    这里消费了一条,一共八条,差7条没有消费

    重置消费端offset

    mqadmin resetOffsetByTime  -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000

    打印broker中某个队列里的消息

    mqadmin printMsgByQueue -a izm5e210z0uiwyavdbmpxaz -t T1 -n 127.0.0.1:9876 -i 1 -p true -d true
    这里可以看出,storeSize最后多了一条原因就是最后的body里12是两位,废话。。。

    直接打印消息

    mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876

    The most commonly used mqadmin commands are:
       updateTopic          Update or create topic
       deleteTopic          Delete topic from broker and NameServer.
       updateSubGroup       Update or create subscription group
       deleteSubGroup       Delete subscription group from broker.
       updateBrokerConfig   Update broker's config
       updateTopicPerm      Update topic perm
       topicRoute           Examine topic route info
       topicStatus          Examine topic Status info
       topicClusterList     get cluster info for topic
       brokerStatus         Fetch broker runtime status data
       queryMsgById         Query Message by Id
       queryMsgByKey        Query Message by Key
       queryMsgByUniqueKey  Query Message by Unique key
       queryMsgByOffset     Query Message by offset
       queryMsgByUniqueKey  Query Message by Unique key
       printMsg             Print Message Detail
       printMsgByQueue      Print Message Detail
       sendMsgStatus        send msg to broker.
       brokerConsumeStats   Fetch broker consume stats data
       producerConnection   Query producer's socket connection and client version
       consumerConnection   Query consumer's socket connection, client version and subscription
       consumerProgress     Query consumers's progress, speed
       consumerStatus       Query consumer's internal data structure
       cloneGroupOffset     clone offset from other group.
       clusterList          List all of clusters
       topicList            Fetch all topic list from name server
       updateKvConfig       Create or update KV config.
       deleteKvConfig       Delete KV config.
       wipeWritePerm        Wipe write perm of broker in all name server
       resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
       updateOrderConf      Create or update or delete order conf
       cleanExpiredCQ       Clean expired ConsumeQueue on broker.
       cleanUnusedTopic     Clean unused topic on broker.
       startMonitoring      Start Monitoring
       statsAll             Topic and Consumer tps stats
       allocateMQ           Allocate MQ
       checkMsgSendRT       check message send response time
       clusterRT            List All clusters Message Send RT
       getNamesrvConfig     Get configs of name server.
       updateNamesrvConfig  Update configs of name server.
       getBrokerConfig      Get broker config by cluster or special broker!
       queryCq              Query cq command.
  • 相关阅读:
    java实现调用打印机代码
    java合并PDF文件
    关于如何把项目做得更好的一次思考
    web语义化之SEO和ARIA
    快速理解web语义化
    使用HTML5地理位置定位到城市的方法及注意事项
    Plupload上传插件简单整理
    两列布局——左侧宽度固定,右侧宽度自适应的两种方法
    Java并发编程之线程基础
    Spring Boot学习之YAML文件配置
  • 原文地址:https://www.cnblogs.com/it-worker365/p/9641791.html
Copyright © 2011-2022 走看看