zoukankan      html  css  js  c++  java
  • 消息中间件与rabbitmq(一)

    本文将从三步讲述消息中间件

    • 从生产消费者模型到消息中间件
      • 生产消费者模型的作用以及适用场景
      • 手动实现消费生产者模型的缺陷
    • 消息队列
    • 消息中间件
      • 消息中间件的定义与常用类型
      • 消息中间价的操作
      • 消息中间件的选型
      • 消息中间的优缺点

    消息队列

    定义

    消息队列,一般我们会简称它为MQ(Message Queue),直白的说就是储存消息与释放消息的先进先出结构。

    那么把数据放到消息队列叫做生产者,从消息队列里边取数据则被叫做消费者。

    三大属性

    作为消息队列一般具备如下3大属性:

    • 消息顺序

        分区有序的队列通过分布式处理,支持更高的并发,但由于队列的分布式特性,DMS无法保证能够以接收消息的精确顺序进行消费。如果用户要求保持顺序,建议在每条消息中放置排序信息,以便在收到消息时对消息重新排序。

    全局有序的队列对消息消费遵循先入先出规则(FIFO),适用于对消费顺序要求较高的场景。

    • 至少一次传递

        在极少数情况下,当用户接收或删除消息时,存储消息副本的服务器之一可能不可用。如果出现这种情况,则该不可用服务器上的消息副本将不会被删除,并且在接收消息时可能会再次获得该消息副本。

    这被称为“至少一次传递”,因此,用户的应用程序应该设计为幂等的应用程序(即,如果应用程序多次处理同一条消息,则不得受到不利影响)。

    • 消息较少时单次消费不能获取指定数量的消息

        从消息队列中消费消息时,DMS每次从部分消息存储分区中读取消息返回消息给消费者,如果队列中的消息数比较少,则单次消费可能会少于指定条数,但多次消费最终可获取全部消息。

    功能与优点

    • 解耦
    • 弹性伸缩
    • 冗余存储
    • 流量削峰
    • 异步通信
    • 数据同步

    1,解耦

    我们作为A系统开发,当A系统处于1时刻的时候他只需需要向系统B与系统C提供相应的数据,但是因为某些原因B系统需要被D系统替代,那么我们要在A代码中去掉B系统的换成C系统。比如后来下游数据系统又出现问题,那么我们是不是又要更改代码。

    但是我们可以直接发数据发送给相应的消息队列,然后让BCD。。。Z系统自己去改相应的代码,作为A系统开发的我们无需修改代码。这不仅是解耦。

    同时也是一种数据使用规则,当上游系统已经提供相应的数据时,上游系统只提供相应的内聚接口,而下游对数据是否使用如何使用使用什么是由下游代码决定,而上游代码尽量无需修改。

    总结:解耦是消息队列要解决的最本质问题。一个系统只需要关心核心的流程,而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。

    2.异步

    假设我们系统A还是直接向系统B、C、D传输相应的数据并没有使用消息队列且A系统只能使用单进程单线程,

    当我们产生数据而且还要传输数据到B,可能系统B此时没有相应的资源去消费数据,那么我们的这个传输过程将会处于阻塞一直等待B系统有资源去消费数据,而且消费过程也是会占用我们的时间(同步)

    ,然后我们才能继续往CD等系统传输数据。

    但是我们采用消息队列的方式,我们A系统只用负责往队列中填放数据即可,同时数据会被队列存贮(持久性),然后继续往队列中生产数据。与此同时,BCD系统可以自己从队列中直接取数据。

    整个过程我们无须等待,所以是异步的。

    3.流量削峰

    假设我们作为系统A最高可以到每秒3000个请求,我们下游现在两个系统BC处理只能每次处理1000个请求,当我们因为业务原因导致下游要接受>1000的请求,就会导致下游系统的崩溃。

    但是我们采用消息队列的方式,我们会把所有的请求都放在队列中,队列作为缓冲,下游只需要维持相应的不超过自身阈值的处理的速度即可,无需承担相应的压力。

     介绍完核心功能我们介绍以下几个非特性功能

    4.冗余存储

    5.最终一致性

    最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。

    6.广播

    消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。

    有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

    比如本文开始提到的产品中心发布产品变更的消息,以及景点库很多去重更新的消息,可能“关心”方有很多个,但产品中心和景点库只需要发布变更消息即可,谁关心谁接入。

    消息中间件

    定义

    官方的解释:

    “中间件”

    Middleware is computer software that provides services to software applications beyond those available from the operating system. 
    It can be described as "software glue".
    Middleware makes it easier for software developers to implement communication and input/output, so they can focus on the specific purpose of their application.

     “消息中间件”

     Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.

    消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,

    它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。

    消息队列的两种模式

    1.点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

    消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
    Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

    2.发布/订阅模式(一对多,消费者消费数据之后不会清除消息)

    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

    各类消息队列

    ActiveMQ是Apache出品的、采用Java语言编写的完全基于JMS1.1规范的面向消息的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。不过由于历史原因包袱太重,目前市场份额没有后面三种消息中间件多,其最新架构被命名为Apollo,号称下一代ActiveMQ,有兴趣的同学可行了解。

    RabbitMQ是采用Erlang语言实现的AMQP协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。

    Kafka起初是由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本且基于zookeeper协调的分布式消息系统,现已捐献给Apache基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark、Flink等都支持与Kafka集成。

    RocketMQ是阿里开源的消息中间件,目前已经捐献个Apache基金会,它是由Java语言开发的,具备高吞吐量、高可用性、适合大规模分布式系统应用等特点,经历过双11的洗礼,实力不容小觑。

    ZeroMQ号称史上最快的消息队列,基于C语言开发。ZeroMQ是一个消息处理队列库,可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归入消息队列家族之中,但是其和前面的几款有着本质的区别,ZeroMQ本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的Socket API上加上一层封装而已。

    目前市面上的消息中间件还有很多,比如腾讯系的PhxQueue、CMQ、CKafka,又比如基于Go语言的NSQ,有时人们也把类似Redis的产品也看做消息中间件的一种,当然它们都很优秀,但是本文篇幅限制无法穷极所有,下面会针对性的挑选RabbitMQ和Kafka两款典型的消息中间件来做分析,力求站在一个公平公正的立场来阐述消息中间件选型中的各个要点。

    RabbitMQ使用指南

    下载

    https://www.rabbitmq.com/install-windows.html

    RabbitMQ的基本组成

    • Broker:消息队列服务器实体
    • connection:应用程序与broker的网络连接。其实就是生产者producer。
    • channel:几乎所有的操作都在channel中进行,channel是进行消息读写的通道。客户端可建立多个channel,每个channel代表一个会话任务。
    • exchange:接收消息,根据路由键转发消息到绑定的队列
    • queue:消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
      • message:每个消息都有一个路由键(routing key)的属性。就是一个简单的字符串。
      • binding:一个绑定就是基于路由键将交换机和队列连接起来的路由规则,所以交换机不过就是一个由绑定构成的路由表。
        • 举例:一个具有路由键“key1”的消息要发送到两个队列,queueA和queueB。要做到这点就要建立两个绑定,每个绑定连接一个交换机和一个队列。                                                                                                                                                         两者都是由路由键“key1”触发,这种情况,交换机会复制一份消息并把它们分别发送到两个队列中。

    交换机类型

    交换机用来接收消息,转发消息到绑定的队列,是rabbitMq中的核心。

    交换机共有4种类型:direct,topic,headers和fanout。

    为什么不创建一种交换机来处理所有类型的路由规则?因为每种规则匹配时的CPU开销是不同的,所以根据不同需求选择合适交换机。

    1、Direct交换机:转发消息到routingKey指定队列(完全匹配,单播)。

    routingKey与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routingkey标记为dog的消息,不会转发dog.puppy,也不会转发dog.guard等。

    2、Topic交换机:按规则转发消息(最灵活,组播)

     Topic类型交换机通过模式匹配分配消息的routing-key属性。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。

    它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

    例如,binding key:*.stock.#匹配routing key: usd.stock和eur.stock.db,但是不匹配stock.nana。

    例如,“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。

    3、Fanout交换机:转发消息到所有绑定队列(最快,广播)

    fanout交换机不处理路由键,简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

    很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    3、header交换机:

    发送到该交换器的消息,会通过消息的 header 信息匹配,匹配成功就会路由到指定队列

    交换机与队列属性

    • 交换机的属性:
      • 持久性:如果启用,交换机将会在server重启前都有效。
      • 自动删除:如果启用,那么交换机将会在其绑定的队列都被删掉之后删除自身。
      • 惰性:如果没有声明交换机,那么在执行到使用的时候会导致异常,并不会主动声明。
    • 队列的属性:
      • 持久性:如果启用,队列将在Server服务重启前都有效。
      • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除自身。
      • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
      • 排他性:如果启用,队列只能被声明它的消费者使用。  

    操作rabbitmq

    1、启动
    rabbitmq-server &
    
    启动rabbitmq:rabbitmq-service start
    关闭rabbitmq:rabbitmq-service stop
    
    2、队列重置(清空队列、用户等)
    
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl stop
    3、关闭
    rabbitmqctl stop
    4、列举出所有用户
    rabbitmqctl list_users
    5、列举出所有队列
    rabbitmqctl list_queues
    6、添加用户
    rabbitmqctl add_user user_name user_passwd
    7、设置用户角色为管理员
    rabbitmqctl set_user_tags user administrator
    8、权限设置
    rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
    9、查看状态
    rabbitmqctl status
    10、虚拟主机
    新增虚拟主机:rabbitmqctl add_vhost  vhost_name
    将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
    10、安装RabbitMQWeb管理插件
    启动监控管理器:rabbitmq-plugins enable rabbitmq_management
    关闭监控管理器:rabbitmq-plugins disable rabbitmq_management 

    当安装完 rabbitmq_management 插件后,必须重启rabbitmq。或者直接使用services.msc,在服务中找到RabbitMQ服务,直接【重新启动】即可。

    重启成功后,直接输入相应的web地址与端口,然后输入用户名密码

    #web
    http://localhost:15672
    #并使用下面的账号密码登入:
    用户名:guest 
    密码:guest

    1

  • 相关阅读:
    ArrayList和LinkedList比较
    高度最小的BST
    Linux查看网络即时网速
    UISegmentedControl判断点击第几项
    ant关于发邮件报错535 Error:authentication failed解决方法
    Selenium WebDriver下载地址
    GitLab服务器IP地址修改
    jenkins+gitlab自动化构建
    GitLab默认密码
    jenkins全局安全配置-授权策略,误操作将设置为遗留模式,导致无全部管理员权限,修改config.xml的<authorizationStrategy 为以下
  • 原文地址:https://www.cnblogs.com/wqbin/p/12609646.html
Copyright © 2011-2022 走看看