zoukankan      html  css  js  c++  java
  • rocketmq 记

    Rocketmq选型

    Rocket是一个专业的队列服务,性能优于Rabbitmq,优势是性能和并发,源于Kafka的扩展版,增强了数据的可靠性。

    Rocketmq的队列类型

    普通队列,广播队列、顺序队列,分区顺序

    2、同步机制

    Rocketmq使用主从同步模式,同步分为同步和异步模式,这和mysql类似。

    3、Rocketmq管理命令

    rocketmq也可以通过web管理,坑中有说 

    创建topic

    bin/mqadmin updateTopic -n '192.168.1.64:9876;192.168.1.107:9876' -c DefaultCluster -t tt -w 5 -r 5 -o=true   #-o=true是有序队列

    Rocketmq坑

    1、rocketmq 在本地搭建环境启动时,内存溢出报了内存错误:

    # There is insufficient memory for the Java Runtime Environment to continue.
    # Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.

    解决方法:修改bin/runbroker.sh,bin/runserver.sh

    -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

    内存大的虚拟机没问题,像我这渣渣笔记本,得节省点啊!

    2、Rocketmq管理web

    https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

    下载安装Console项目

    Rocketmq的普通队列、顺序队列、分区顺序队列、广播消息实现、事务队列

    https://www.jianshu.com/p/3afd610a8f7d

    https://www.jianshu.com/p/790d6bc4a1c1

    https://www.jianshu.com/p/53324ea2df92

    我遇到的坑

    阿里云上面买的RocketMq和开源版本的Rocketmq的sdk不同,有着兼容问题。我们有一个自己机房和阿里的共用运行的服务,这时候发现只能在阿里机房上布,而在本地机房看来只能再写一套抽象层了,这非常不友好。使用了阿里的 Rocketmq也就意味着你的程序移植不到其它云或服务器上了。

    slave 修改:

    1、内存优化

    bin/runbroker.sh MaxDirectMemorySize =15g  改成1g

    2、host解析失败

    在rocketmq的主从是通过获取hostname然后解析host到ip上进行互相访问的,所以需要将各机器的hosts增加解析

    192.168.1.102       localhost.ceontsPHP

    192.168.1.101       localhost.bobby

    hostname可以通过各服务器的/etc/hostsname查看

    rocket会报错:rocketmq Failed to obtain the host name

    3、锁文件冲突原因

    因为rocketmq设计一个用户运行一个示例,一个用户锁定了一个用户store存储目录,所以在一起运行会冲突,提示:Lock failed,MQ already startedz

    解决方法是每个示例使用一个用户

    broker.conf 指定 storePathRootDir目录

    4、端口会冲突

    不同broker需使用不同端口,如果不是root用户,端口号应该是1024+

    一个broker不仅仅使用你指定的listenPort,上下最好空几个,比如指定1081端口,他还占用了1079 1081 1082

    5、配置文件坑

    我直接拿官方配置文件2m-2s-async下的配置,不行,没有提示,就是执行clusterList时,没有集群,这很坑,然后我自己建配置文件成功

    bin/mqbroker -c /root/rocketmq/release/conf/2m-2s-async/broker-a-s.properties > a.log 2>&1

    bin/mqadmin clusterList -n centos:9876

    bin/mqadmin updateTopic -n 'centos:9876' -c mycluster -t tt -w 5 -r 5 -o=true

    # broker a

    namesrvAddr=centos:9876

    brokerIP1=192.168.56.101

    brokerName=broker-a

    brokerClusterName=mycluster

    brokerId=0

    listenPort=1081

    autoCreateTopicEnable=true

    autoCreateSubscriptionGroup=true

    rejectTransactionMessage=false

    fetchNamesrvAddrByAddressServer=false

    storePathRootDir=/root/storeA

    storePathCommitLog=/root/store/commitlog

    flushIntervalCommitLog=500

    commitIntervalCommitLog=200

    flushCommitLogTimed=false

    deleteWhen=04

    fileReservedTime=48

    maxTransferBytesOnMessageInMemory=262144

    maxTransferCountOnMessageInMemory=32

    maxTransferBytesOnMessageInDisk=65536

    maxTransferCountOnMessageInDisk=8

    accessMessageInMemoryMaxRatio=40

    messageIndexEnable=true

    messageIndexSafe=false

    haMasterAddress=

    brokerRole=ASYNC_MASTER

    flushDiskType=ASYNC_FLUSH

    cleanFileForciblyEnable=true

    transientStorePoolEnable=false

    #broker a-s

    namesrvAddr=centos:9876

    brokerIP1=192.168.56.102

    brokerName=broker-a

    brokerClusterName=mycluster

    brokerId=1

    listenPort=1091

    autoCreateTopicEnable=true

    autoCreateSubscriptionGroup=true

    rejectTransactionMessage=false

    fetchNamesrvAddrByAddressServer=false

    storePathRootDir=/root/storeAs

    storePathCommitLog=/root/store/commitlog

    flushIntervalCommitLog=500

    commitIntervalCommitLog=200

    flushCommitLogTimed=false

    deleteWhen=04

    fileReservedTime=48

    maxTransferBytesOnMessageInMemory=262144

    maxTransferCountOnMessageInMemory=32

    maxTransferBytesOnMessageInDisk=65536

    maxTransferCountOnMessageInDisk=8

    accessMessageInMemoryMaxRatio=40

    messageIndexEnable=true

    messageIndexSafe=false

    haMasterAddress=

    brokerRole=SLAVE

    flushDiskType=ASYNC_FLUSH

    cleanFileForciblyEnable=true

    transientStorePoolEnable=false

    #broker b

    namesrvAddr=centos:9876

    brokerIP1=192.168.56.101

    brokerName=broker-b

    brokerClusterName=mycluster

    brokerId=0

    listenPort=1085

    autoCreateTopicEnable=true

    autoCreateSubscriptionGroup=true

    rejectTransactionMessage=false

    fetchNamesrvAddrByAddressServer=false

    storePathRootDir=/root/storeB

    storePathCommitLog=/root/store/commitlog

    flushIntervalCommitLog=500

    commitIntervalCommitLog=200

    flushCommitLogTimed=false

    deleteWhen=04

    fileReservedTime=48

    maxTransferBytesOnMessageInMemory=262144

    maxTransferCountOnMessageInMemory=32

    maxTransferBytesOnMessageInDisk=65536

    maxTransferCountOnMessageInDisk=8

    accessMessageInMemoryMaxRatio=40

    messageIndexEnable=true

    messageIndexSafe=false

    haMasterAddress=

    brokerRole=ASYNC_MASTER

    flushDiskType=ASYNC_FLUSH

    cleanFileForciblyEnable=true

    transientStorePoolEnable=false

    #broker b-s

    namesrvAddr=centos:9876

    brokerIP1=192.168.56.102

    brokerName=broker-b

    brokerClusterName=mycluster

    brokerId=1

    listenPort=1095

    autoCreateTopicEnable=true

    autoCreateSubscriptionGroup=true

    rejectTransactionMessage=false

    fetchNamesrvAddrByAddressServer=false

    storePathRootDir=/root/storeBs

    storePathCommitLog=/root/store/commitlog

    flushIntervalCommitLog=500

    commitIntervalCommitLog=200

    flushCommitLogTimed=false

    deleteWhen=04

    fileReservedTime=48

    maxTransferBytesOnMessageInMemory=262144

    maxTransferCountOnMessageInMemory=32

    maxTransferBytesOnMessageInDisk=65536

    maxTransferCountOnMessageInDisk=8

    accessMessageInMemoryMaxRatio=40

    messageIndexEnable=true

    messageIndexSafe=false

    haMasterAddress=

    brokerRole=SLAVE

    flushDiskType=ASYNC_FLUSH

    cleanFileForciblyEnable=true

    transientStorePoolEnable=false

    rocketmq sdk使用newFixThreadPool线程池用于获取server中的数据(线程数获取的cpu核心数),拿到消息后,submit 给->ThreadPoolExecutor线程池进行执行消费,执行消费可以指定最大线程数和最小线程数。

    rocketmq多消费端是需要通过tag指定的,如果你想和使用rabbitmq一样使用,可能会疯,百度搜出来的鸡八狗屁文章,根本狗屁不通。

    rocketmq 存储结构:
    https://blog.csdn.net/mr253727942/article/details/55805876

    IO调度方式:

    https://www.cnblogs.com/cobbliu/p/5389556.html

  • 相关阅读:
    JZ初中OJ 2266. 古代人的难题
    JZ初中OJ 1341. [南海2009初中] water
    JZ初中OJ 1340. [南海2009初中] jumpcow
    JZ初中OJ 2000. [2015.8.6普及组模拟赛] Leo搭积木
    JZ初中OJ 1999.[2015.8.6普及组模拟赛] Wexley接苹果
    Unity Android平台下插件/SDK开发通用流程
    UNITY接入支付宝(未测试可行)
    Unity接入支付宝(免写安卓代码,使用JAR方式)
    Unity之多态
    unity与android交互(1)与(2)网友的整理
  • 原文地址:https://www.cnblogs.com/a-xu/p/9433795.html
Copyright © 2011-2022 走看看