zoukankan      html  css  js  c++  java
  • RabbitMQ协议基础及C++和Java混合开发

    目前面对大多数的需要在异构系统间进行消息传递技术路线,大多会选择socket或webservice。这两种技术的共同特点是耦合紧,调试依赖双方同步,但是效率高。除此以外,使用消息队列(MQ)的应用场景也偶尔能遇到。本文就将要从AMQP协议说起,重点介绍利用RabbitMQ实现C++和Java跨系统开发的实践。

    一、AMQP是什么

    AMQP又称为高级消息队列协议,是一种进程间进行异步消息的网络协议。它的出现是为了让各类消息中间件提供统一服务,以降低系统集成的开销。目前,完全准寻AMQP协议的消息中间件只有RabbitMQ。虽然各大中间件产品也都针对不同的语言推出了客户端。但是,无论是从业务适应性还是集成通用性上来说,比较推荐的还是RabbitMQ。不同的消息中间件在性能上的差异网上资料很多,这里不再赘述。

    amqp协议和http协议一样都是建立在TCP/IP协议簇之上的应用层协议。不同于http协议的,它是一个二进制协议,具有多信道,异步,高效等特点。amqp协议规定了从消息发布者到消息接收者之间的消息传递方式,并且提出了交换机(Exchange)队列(Queue)以及他们之间的路由(Routing)。

    作为一套标准协议,使用者甚至可以完全根据amqp的协议规范定制化的开发出客户端和RabbitMQ通信,这一特点也让RabbitMQ在业务通用性上具备了得天独厚的优势。标准的amqp协议格式如下:

    amqp://<username>:<password>@<host>:<port>/<virtual>

    username: 用户名

    password: 登录密码

    host: 服务所在主机地址

    port: 服务端口号

    virtual: 虚拟路径

    AMQP协议最值得学习的地方在于,它定义了消息的发送和投递过程:

      交换机(Exchange)负责接收消息,并根据提前指定的规则(Routing)投送消息到特定队列(Queue)。消费者监听队列,并处理消息。如果多个消费者监听同一个队列,消息一般会轮流的发送给它们。以实现负载均衡。此外,通过虚拟路径约束还允许在不同的虚拟路径下建立同命队列。

    AMQP协议默认提供了四种类型的交换机:

    直接交换机(Direct Exchange):根据路由键的不同将消息直接发送到不同队列,未匹配路由键的消息会被丢弃。

    扇形交换机(Funout Exchange):扇形交换机是实现广播的基础,它能够同时将消息推送给多个队列。

    主题交换机(Topic Exchange):交换机会根据路由键进行模糊匹配,从而完成消息投送。

    头交换机(Header Exchange):它不依赖特定路由键,而是将投送目标写在消息头,支持字典类型,配置更加灵活。

    二、C++开发指南

    官网提供了其它常见语言的开发向导,对于C++个人推荐使用AMQP-CPP这套库。另外还需要一套网络库支持,个人也推荐libevent。编译方法可以参考github上的说明。发送方式区别于传统的socket,你不应该将一条消息分多个部分发送。因此推荐使用对象序列化模型直接转换为字节数组,同样受到tcp/ip传输的制约,你应该选择高效的序列化工具来进行。个人推荐使用protobuf,同样作为一种跨平台的支持。

    下面以一套RPC调用为例进行说明:

    #include <iostream>
    #include "event2/event.h"
    #include "amqpcpp.h"
    #include "amqpcpp/libevent.h"
    #include "amqp_msg.pb.h"
    #include <string>
    
    using namespace std;
    using namespace amqp;
    
    int main()
    {
        event_base *base = event_base_new(); // 通过libevent启动实践循环
        AMQP::LibEventHandler handler(base);
        AMQP::TcpConnection connection(&handler, AMQP::Address("localhost", 5672, AMQP::Login("guest", "guest"), "/"));
        AMQP::TcpChannel channel(&connection); // 创建一条通道
        channel.setQos(1);
        // 监听login.rpc队列
        channel.consume("login.rpc").onReceived([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
        {
            cout << "login.rpc" << endl;
            Login login;
            login.ParseFromArray(message.body(), message.bodySize());
            Response resp; // 创建应答对象
            resp.set_status(Response_RespType_OK);
            resp.set_session_id("acd");
            char data[1024] = {0};
            int data_size = resp.ByteSizeLong();
            resp.SerializeToArray(data, data_size);
            AMQP::Envelope env(data, data_size);
            env.setCorrelationID(message.correlationID()); // 获取应答ID
            channel.publish("", message.replyTo(), env); // 发送给应答队列
            channel.ack(deliveryTag); // 向MQ发送确认
        }).onSuccess([&](const std::string &consumertag)
        {
    
        }).onError([](const char *message)
        {
            event_base_loopbreak(base); // 发送错误中断事件循环
            cout << message << endl;
        });
        // 监听logout.rpc队列
        channel.consume("logout.rpc")
                .onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool)
        {
            Logout logout;
            logout.ParseFromArray(message.body(), message.bodySize());
            Response resp;
            resp.set_status(Response_RespType_OK);
            char data[1024] = {0};
            int data_size = resp.ByteSizeLong();
            resp.SerializeToArray(data, data_size);
            AMQP::Envelope env(data, data_size);
            env.setCorrelationID(message.correlationID());
            channel.publish("", message.replyTo(), env);
            channel.ack(deliveryTag);
        }).onError([](const char *message)
        {
            event_base_loopbreak(base);
            cout << message << endl;
        });
        event_base_dispatch(base); // 事件循环
        event_base_free(base); // 释放
        return 0;
    }

    AMQP-CPP库直接主动连接,或者你也可以在继承相应的Handler自己完成网络连接。此外,Connection 和 Channel的创建也都支持回调函数。如:

    channel.onError([&base](const char* message)
    {
        std::cout << "Channel error: " << message << std::endl;
        event_base_loopbreak(base);
    });
    channel.declareQueue("queueName", AMQP::passive)
           .onSuccess([&](const string& name, uint32_t, uint32_t)
    {
        cout << "Queue Name:" << name << endl;
    });
    channel.declareExchange("logs", AMQP::fanout)
           .onSuccess([&]() {})

    三、Spring AMQP开发指南

    与Spring整合的技巧,官网有很详细的指导意见。这里只给出与上文C++配合的请求端如何发送以及等待应答的核心代码:

    @GetMapping("login")
    public String loginRpc() throws InvalidProtocolBufferException {
        AmqpMsg.Login login = AmqpMsg.Login.newBuilder()
                .addParams(AmqpMsg.PairParams.newBuilder().setKey("username").setValue("admin").build())
                .addParams(AmqpMsg.PairParams.newBuilder().setKey("password").setValue("admin").build())
                .build();
        byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "login.rpc", login.toByteArray());
    
        AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp);
        if (response.getStatus() == AmqpMsg.Response.RespType.OK) {
            String sessionID = response.getSessionId();
            System.out.println("登录成功 SessionID=" + sessionID);
            return "SUCCESS";
        }
        return "ERROR";
    }
    
    @GetMapping("logout")
    public String logoutRpc() throws InvalidProtocolBufferException {
        AmqpMsg.Logout logout = AmqpMsg.Logout.newBuilder()
                .setSessionId("123456").build();
        byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "logout.rpc", logout.toByteArray());
        AmqpMsg.Response response = AmqpMsg.Response.parseFrom(resp);
        if(response.getStatus() == AmqpMsg.Response.RespType.OK) {
            System.out.println("注销成功");
            return "SUCCESS";
        }
        return "ERROR";
    }
    @Configuration
    public class RPCRabbitConfig {
        @Bean("simple")
        public Queue simpleQueue() {
            return new Queue("simple");
        }
    
        @Bean("login.rpc")
        public Queue loginRpcQueue() {
            return new Queue("login.rpc");
        }
    
        @Bean("logout.rpc")
        public Queue logoutRpcQueue() {
            return new Queue("logout.rpc");
        }
    
        @Bean
        public DirectExchange defaultExchange() {
            return new DirectExchange("amq.direct");
        }
    
        @Bean
        public Binding loginRpcBinding(DirectExchange exchange, @Qualifier("login.rpc") Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("login.rpc");
        }
    
        @Bean
        public Binding logoutRpcBind(DirectExchange exchange, @Qualifier("logout.rpc") Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("logout.rpc");
        }
    }

    后记:可能是由于工作上与架构的关系比较密切,目前在博客中提供的大多数解决方案都以跨平台应用为主。如果您对文章中介绍的知识点有任何的疑问也可以与我联系。

  • 相关阅读:
    NHibernate初学者指南系列文章导航
    c# 类一般在哪里实例化,是在类内、方法内还是其他地方?
    日期和时间的正则表达式
    virtual和abstract区别
    VS2010和选中代码相同的代码的颜色设置,修改高亮颜色
    SqlServer表和EXCEL数据互相复制方法
    C#操作XML的方法
    1、Spring Boot 2.x 简介
    C语言学习系列(六)基本语法
    C语言学习系列(六)存储类
  • 原文地址:https://www.cnblogs.com/learnhow/p/12585748.html
Copyright © 2011-2022 走看看