zoukankan      html  css  js  c++  java
  • (转载)RabbitMQ消息队列应用

    转载自:http://www.cnblogs.com/Leo_wl/p/5402125.html

    RabbitMQ消息队列应用

     消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景。本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理;二是通过消息队列传输系统日志。目前业界使用较多的消息队列组件有RabbitMQ、ActiveMQ、MSMQ、kafka、zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件。

     

    图1- 消息队列组件示意图


    一、RabbitMQ介绍

      RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。RabbitMQ与其他消息队列组件性能比较,在此不作介绍,网上有大把的资料。


    二、RabbitMQ原理简介

     

    图2- RabbitMQ结构示意图

      RabbitMQ中间件分为服务端(RabbitMQ Server)和客户端(RabbitMQ Client),服务端可以理解为是一个消息的代理消费者,客户端又分为消息生产者(Producer)和消息消费者(Consumer)。

      1、消息生产者(Producer):主要生产消息并将消息基于TCP协议,通过建立Connection和Channel,将消息传输给RabbitMQ Server,对于Producer而言基本就完成了工作。

      2、服务端(RabbitMQ Server):主要负责处理消息路由、分发、入队列、缓存和出列。主要由三部分组成:Exchange、RoutingKey、Queue。

        (1)Exchange:用于接收消息生产者发送的消息,有三种类型的exchange:direct, fanout,topic,不同类型实现了不同的路由算法;

        A. direct exchange:将与routing key 比配的消息,直接推入相对应的队列,创建队列时,默认就创建同名的routing key。

        B. fanout exchange:是一种广播模式,忽略routingkey的规则。

        C. topic exchange:应用主题,根据key进行模式匹配路由,例如:若为abc*则推入到所有abc*相对应的queue;若为abc.#则推入到abc.xx.one ,abc.yy.two对应的queue。

        (2)RoutingKey:是RabbitMQ实现路由分发到各个队列的规则,并结合Binging提供于Exchange使用将消息推送入队列;

        (3)Queue:是消息队列,可以根据需要定义多个队列,设置队列的属性,比如:消息移除、消息缓存、回调机制等设置,实现与Consumer通信;

      3、消息消费者(Consumer):主要负责消费Queue的消息,同样基于TCP协议,通过建立Connection和Channel与Queue传输消息,一个消息可以给多个Consumer消费;

      4、关键名词说明:Connection、Channel、Binging等;

        (1)Connection:是建立客户端与服务端的连接。

        (2)Channel:是基于Connection之上建立通信通道,因为每次Connection建立TCP协议通信开销及性能消耗较大,所以一次建立Connection后,使用多个Channel通道通信减少开销和提高性能。

        (3)Binging:是一个捆绑定义,将exchange和queue捆绑,定义routingkey相关策略。


    三、RabbitMQ安装部署

       以上对RabbitMQ简介,接下来我们通过实际搭建消息队列服务实践。RabbitMQ服务端能运行于Window、Linux和Mac平台,客户端也支持多种技术的实现。本次我们将在Linux之CentOS7平台搭建。

      1、安装Erlang运行环境

        由于RabbitMQ使用Erlang技术开发,所以需要先安装Erlang运行环境后,才能安装消息队列服务。

        (1)配置系统能正常访问公网,设置默认网关

    1
    route add default gw 192.168.1.1

        (2)安装erlang

        (3)检查erlang是否安装成功

    1
    erl

        (4)安装成功

        

      2、安装RabbitMQ服务端

        (1)下载安装包

    1
    wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm

        (2)安装和配置RabbitMQ服务端,3.6.0版本:

    1
    2
    rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
    yum install rabbitmq-server-3.6.0-1.noarch.rpm

        (3)启用web管理插件

    1
    rabbitmq-plugins enable rabbitmq_management

        (4)启动RabbitMQ

    1
    2
    chkconfig rabbitmq-server on
    /sbin/service rabbitmq-server start

        (5)防火墙开通端口

    1
    2
    3
    # firewall-cmd --permanent --zone=public --add-port=5672/tcp
    # firewall-cmd --permanent --zone=public --add-port=15672/tcp
    # firewall-cmd --reload

        (6)rabbitmq默认会创建guest账号,只能用于localhost登录页面管理员,本机访问地址:http://localhost:15672/

    1
    2
    rabbitmqctl add_user test test
    rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ".*" ".*" ".*"

          RabbitMQ 管理员页面。


    四、RabbitMQ应用

       本章节描述,web应用生产的日志,通过rabbitmq传输,然后日志服务接收消息队列的消息。

        

    图3- 功能结构示意图

      本系统采用官方的Client,通过nuget引用。

      

       1、Web应用生产业务日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [HttpPost]
            public ActionResult Create()
            {
                this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss");
                var txt = Request.Form["txtSite"].ToString();
                RabbitMQHelper helper = new RabbitMQHelper();
                helper.SendMsg(txt + ",操作日志,时间:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
     
                return RedirectToAction("Index");
            }

      页面效果图。

      2、日志服务接收日志消息

         基于window form开发一个日志处理服务,并将接收的消息打印出来。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    private void btnReceive_Click(object sender, EventArgs e)
            {
                isConnected = true;
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("MyLog"falsefalsefalsenull);
     
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("MyLog"true, consumer);
     
                    while (isConnected)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
     
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        txtMsg.Text += message + " ";
     
                    }
                }
            }

      

       3、RabbitMQ页面监控情况

       RabbitMQ自带页面监控工具,通过此工具可以监控MQ的情况:

             

    五、总结

      此文大概介绍RabbitMQ搭建和应用于本系统,RabbitMQ还是较为简单,还提供界面监控工具方便运维人员监控。

  • 相关阅读:
    Semaphore wait has lasted > 600 seconds
    mysql二进制日志
    HashMap(JDK1.9)详解
    企业中如何批量更改mysql中表的存储引擎?
    mysql监控
    String源码详解
    字符编码详情
    mysql事务详解
    数据库水平分表(一个大数据量的表)
    bat
  • 原文地址:https://www.cnblogs.com/wu-fm/p/7600134.html
Copyright © 2011-2022 走看看