zoukankan      html  css  js  c++  java
  • Rabbitmq C++客户端 Rabbitmq Client

    概述

    最近项目消息队列服务选用了rabbitmq,server端用的C++开发的,于是需要开发rabbitmq的c++客户端,国际惯例先百度了一圈,然后github搜了一圈,竟然发现排名靠前的需要付费才能使用,尼玛这都拿出来骗钱(愤青了),于是产生了写个客户端给大家使用的念头。

    我的应用场景是这样的生产者是用java写的(java别说了,一搜一大堆,改个服务器地址就能用),服务端作为消费者,老本行用的C++开发的,只需写个c++的rabbitmq客户端嵌入到我的server里就行,所有的消费者监听的一个队列,无需指定routing key和exchange,任何一个消费者消费到数据就行(我的服务端是个集群)。话不多说上代码。

    实现

    我的环境是Linux centos7.2

    amqpclient.cpp源码如下:

    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <stdint.h>
    
    #include "amqp_tcp_socket.h"
    #include "amqp.h"
    #include "amqp_framing.h"
    #include "utils.h"
    #include "platform_utils.h"
    #include "amqpclient.h"
    
    /*
    * 构造函数
    * hostname,port,user,pwd,vhost
    */
    
    AmqpClient::AmqpClient(const char* hostname, const int port, const char* user, 
    const char* pwd, const char* vhost, const char* queuename)
    {
        if((hostname != NULL) || (port != 0) || (user != NULL) 
        || (pwd != NULL) || (vhost != NULL) || (queuename != NULL))
        {
            strcpy(this->m_hostname, hostname);
            this->m_port = port;
            strcpy(this->m_user, user);
            strcpy(this->m_pwd, pwd);
            strcpy(this->m_vhost, vhost);
            strcpy(this->m_queuename, queuename);
        }
    }
    
    bool AmqpClient::ConnectRabbitmq()
    {
        amqp_socket_t *socket ;
        amqp_rpc_reply_t reply ;
        int status;
    
        conn = amqp_new_connection();
        socket = amqp_tcp_socket_new(conn);
        if(!socket)
        {
            printf("amqp new socket error
    ");
            return 0;
        }
        status = amqp_socket_open(socket, m_hostname, m_port);
        if(status)
        {
            printf("amqp open socket error
    ");
            return 0;
        }
        reply = amqp_login(conn, m_vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_user, m_pwd);
        if (reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
        {
            printf("amqp login error
    ");
            return 0;
        }
        amqp_channel_open(conn, 1);
        reply = amqp_get_rpc_reply(conn);
        if(reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
        {
            printf("ConnectRabbitmq::amqp get rpc_reply error
    ");
            return 0;
        }
    }
    
    bool AmqpClient::StartConsumer()
    {
        amqp_rpc_reply_t reply ;
        //自动回复ACK
        amqp_basic_consume(conn, 1, amqp_cstring_bytes(m_queuename), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
        reply = amqp_get_rpc_reply(conn);
        if(reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
        {
            printf("StartConsumer::amqp get rpc_reply error
    ");
            return false;
        }
        {
            for (;;) 
            {
                amqp_rpc_reply_t res;
                amqp_envelope_t envelope;
    
                amqp_maybe_release_buffers(conn);
    
                res = amqp_consume_message(conn, &envelope, NULL, 0);
    
                if (AMQP_RESPONSE_NORMAL != res.reply_type) {
                    break;
                }
    
                printf("Delivery %u, exchange %.*s routingkey %.*s
    ",
                        (unsigned) envelope.delivery_tag,
                        (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
                        (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
    
                if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
                    printf("Content-type: %.*s
    ",
                        (int) envelope.message.properties.content_type.len,
                        (char *) envelope.message.properties.content_type.bytes);
                }
                printf("----
    ");
    
                amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
    
                amqp_destroy_envelope(&envelope);
            }
        }
    }
    
    void AmqpClient::CloseConnect()
    {
        amqp_rpc_reply_t reply ;
        amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn);
    }

    amqpclient.h源码如下:

    #ifndef _AMQPCLIENT_H
    #define _AMQPCLIENT_H
    
    #include "amqp.h"
    
    class AmqpClient
    {
    public:
        AmqpClient(const char* hostname, const int port, const char* user, const char* pwd, 
        const char* vhost, const char* queuename);
        bool ConnectRabbitmq();
        bool StartConsumer();
        void CloseConnect();
    private:
        amqp_connection_state_t conn = NULL;
        char m_hostname[128];
        int  m_port;
        char m_user[128];
        char m_pwd[128];
        char m_vhost[128];
        char m_queuename[128];
    };
    
    #endif


    main.cpp源码如下:

    #include <stdio.h>
    #include "amqpclient.h"
    
    int main()
    {
        const char* hostname = "192.168.12.20";
        const int   port = 5672;
        const char* user = "woniu201";
        const char* pwd  = "woniu201";
        const char* vhost = "/";
        const char* queuename = "queuename001";
    
        AmqpClient amqpClient(hostname, port, user, pwd, vhost, queuename);
        amqpClient.ConnectRabbitmq();
        printf("connect rabbitmq succ, is consuming!
    ");
        amqpClient.StartConsumer();
    
        return 1;
    }


    Makefile文件:

    EXE=rabbitmq_consumer
    SUBDIR=src
    
    #CXXFLAGS:编译选项, LDFLAGS:链接选项
    CXXFLAGS += 
    LDFLAGS += -L/usr/local/lib -lrabbitmq
    
    CXX_SOURCES =$(foreach dir,$(SUBDIR), $(wildcard $(dir)/*.cpp))
    CXX_OBJECTS=$(patsubst  %.cpp, %.o, $(CXX_SOURCES))
    DEP_FILES  =$(patsubst  %.o,  %.d, $(CXX_OBJECTS))
    
    $(EXE): $(CXX_OBJECTS)
    	g++  $(CXX_OBJECTS) -o $(EXE) $(LDFLAGS)
    	
    %.o: %.cpp
    	g++  -c  $(CXXFLAGS) -MMD $<  -o  $@
    
    -include $(DEP_FILES)
    
    clean: 
    	rm  -rf  $(CXX_OBJECTS)  $(DEP_FILES)  $(EXE)
    
    test:
    	echo $(CXX_OBJECTS)
    
    initlib:
    	ln -s lib/librabbitmq.so.4.2.0 /usr/local/lib/librabbitmq.so
    	ln -s lib/librabbitmq.so.4.2.0 /usr/lib64/librabbitmq.so.4
    	


    这里用到了librabbitmq.so.4动态库,make前先执行make initlib指令,会把所用的librabbitmq.so库放到系统环境中。

    make完后会生产rabbitmq_consumer可执行文件,./rabbitmq_consumer执行。

    测试结果如下:

    如果到这里还有问题可以直接下载我的源码或者留言交流:http://download.csdn.net/detail/woniu211111/9911075

    没有下载积分的,可以关注下面公众号,回复"107"获取源码

    欢迎加群交流:C/C++开发交流

  • 相关阅读:
    tcpprep 对IPV6的支持
    the server quit without updating pid file (/var/lib/mysql/localhost.localdomain.pid)
    servlet service() for servlet jsp throws null pointer exception
    tomcat开机启动
    mysql 允许远程访问
    spring的helloworld
    java中的那些坑
    关于struts2中的相对路径与绝对路径
    Powercenter Source Filter
    oracle删除当前用户的表
  • 原文地址:https://www.cnblogs.com/woniu201/p/11694611.html
Copyright © 2011-2022 走看看