zoukankan      html  css  js  c++  java
  • docker 部署单节点rocketMQ

    消息队列的介绍

    消息(Message)是指在应用间传送的数据(比如字符串,json等),消息队列(Message Queue,简称MQ)是一个古老的计算机术语,UNIX进程间通信就用到了消息队列技术:一个进程把数据写入某个特定队列中,其它队列读取特定队列中的数据实现异步通信。而现在我们所说的MQ通常指的是独立的消息队列中间件,利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成

    消息队列解决的问题

    在当今互联网微服务架构大行其道的情况下,对于各个服务之间的松耦合、高通信要求越来越高,或者要求各个服务之间的事务一致性等问题。以及高并发请求的流量控制等问题。从而分布式消息队列可以解决大多数生产环境中遇到的问题。包括应用解耦、流量削峰、消息分发、最终一致性等问题。

    • 应用解耦
      比如一个商城系统,包括订单,库存。用户等系统,如果这些系统是高耦合调用,怎么保证其中一个系统出现问题不影响用户下单的行为。通过消息队列就可以把库存系统的数据缓存到消息队列中,而不影响订单系统和用户系统。从而用户可以正产下单,当库存系统恢复之后,再从消息队列中取出数据进行库存的操作。
    • 流量削峰
      比如秒杀互动,在秒杀期间会短时间内有大量请求,这时候如果通过增加部署大量的机器处理这些请求,当秒杀活动结束之后,请求大量降低,会造成很多资源的浪费。通过消息队列可以先保存部分请求,之后再匀速处理请求。从而保证了既不需要增加可多机器,又可以处理高并发的请求。
    • 消息分发
      如果一个服务产生的数据对于其他服务有用,在一个服务生产的数据时候通知其他服务,如果后期再有服务需要这些数据,就可能需要修改原服务才能扩展。通过消息队列可以原服务只要把数据发到消息队列。需要这些数据的服务订阅此消息即可。
    • 最终一致性
      库存系统有自己的数据库,用户系统有自己的数据库,如何保证库存系统扣完库存,在用户系统支付失败的情况下保证库存系统的数据正确性呢?在多个服务之间,每个服务都具有自己的本地事务,从而分布式事务就是我们要考虑的问题。通过消息队列(RocketMq)消息事务可以保证最终的一致性。

    消息队列的组成

    • Broker
      消息服务器,作为server提供消息核心服务
    • Producer
      消息生产者,业务的发起方,生产消息传输给broker
    • Consumer
      消息消费者,业务处理方,负责从broker获取消息并进行业务逻辑处理
    • Topic
      主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
    • Queue
      队列,点对点模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
    • Message
      消息体,根据不同通信协议定义的固定格式编码数据包,来封装业务数据,实现消息的传输。

    RocketMq简介

    Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟、高性能和可靠性、万亿级别的容量和灵活的可伸缩性。

    RocketMq特点

    • 灵活扩展高可用
      方便集群配置,其核心四组件(Name Server、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。

    • 海量消息堆积能力
      RocketMQ 采用零拷贝原理实现超大的消息的堆积能力,据说单机可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。

    • 支持顺序消息
      可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。

    • 多种消息过滤方式
      消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。

    • 支持事务消息
      RocketMQ 除了支持普通消息,顺序消息之外还支持事务消息,这个特性对于分布式事务来说提供了又一种解决思路。

    • 延迟消费
      RocketMQ 可以针对消息设置延迟消费。在发送消息是也可以针对message设置setDelayTimeLevel

    RocketMq基本概念

    • 名称服务器
      名称服务器(NameServer)用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找 Broker 信息。NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现的 NameServer
    • 消息
      消息(Message)就是要传输的信息。一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 key 并在 Broker 上查找此消息以便在开发期间查找问题。
    • 主题
      主题(Topic)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅
    • 消息服务器
      消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 不可以写只可以读。从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。
    • 生产者组
      生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群
    • 消息队列
      消息队列(Message Queue),主题被划分为一个或多个子主题,即消息队列。一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。下图 Broker 内部消息情况:

    RocketMq集群部署结构

    在这里插入图片描述

      • Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
      • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server
      • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
      • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    创建目录

    要把容器内的日志目录和数据目录映射到宿主机上,防止重启之后回到原始状态

    mkdir -p /data
    cd /data
    mkdir -p {conf,logs,store}

    创建broker.conf文件,把文件放在conf目录

    vi broker.conf

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    # 所属集群名字
    brokerClusterName=DefaultCluster
    
    # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
    # 在 broker-b.properties 使用: broker-b
    brokerName=broker-a
    
    # 0 表示 Master,> 0 表示 Slave
    brokerId=0
    
    # nameServer地址,分号分割
    # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    brokerIP1=192.168.1.179     #####修改为自己的IP 
    
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 falsefalsefalse
    autoCreateTopicEnable=true
    
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    # Broker 对外服务的监听端口
    listenPort=10911
    
    # 删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    # 文件保留时间,默认48小时
    fileReservedTime=120
    
    # commitLog 每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    # destroyMapedFileIntervalForcibly=120000
    # redeleteHangedFileInterval=120000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    # commitLog 存储路径
    # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    # 消费队列存储
    # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    # 消息索引存储路径
    # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    # checkpoint 文件存储路径
    # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    # abort 文件存储路径
    # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    
    # Broker 的角色
    # - ASYNC_MASTER 异步复制Master
    # - SYNC_MASTER 同步双写Master
    # - SLAVE
    brokerRole=ASYNC_MASTER
    
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    # 发消息线程池数量
    # sendMessageThreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageThreadPoolNums=128

    创建docker-compose.yml 文件,把文件放在 /data/rocketmq/ 目录

    vi docker-compose.yml

    version: '3.5'
    services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./logs:/opt/logs
          - ./store:/opt/store
        networks:
            rmq:
              aliases:
                - rmqnamesrv
    
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./logs:/opt/logs
          - ./store:/opt/store
          - ./conf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
    
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    
    networks:
      rmq:
        name: rmq

    运行

    docker-compose -f /data/rocketmq/docker-compose.yml up -d

     

    参考:https://zhuanlan.zhihu.com/p/133792711

    https://blog.csdn.net/zevin_zheng/article/details/106096146

  • 相关阅读:
    简单分析实现运维利器---web远程ssh终端录像回放libl
    利用kite对视频流应用进行压力测试
    Springboot 启动扩展
    SpringBoot 自动配置原理
    idea springboot没有启动项,或启动时找不到或无法加载主类
    Elasticsearch、Kibana、elasticsearch-analysis-ik 版本下载地址
    Springboot 操作Elasticsearch 方式二 【rest-high-level-client】
    Elasticsearch 安装x-pack之后,无法连接head问题
    ES版本是向下兼容的,springboot连接ES,可以用低版本客户端
    ES安装elasticsearch-head-master插件
  • 原文地址:https://www.cnblogs.com/xiaoyou2018/p/14096211.html
Copyright © 2011-2022 走看看