zoukankan      html  css  js  c++  java
  • RabbitMQ

    使用RabbitMQ时,连接rabbit-server一直连接失败,代码没有任何错误提示。但是通过rabbitmqctl始终查询不到连接以及创建的queue等信息。

    官方的文件demo里面也没有TcpConnection相关例子,只在github上有些简单说明。

    然而网上几乎所有人都依然还是在使用Connection,几乎没有使用TcpConnection的例子。最后还是放弃了网络求助,老老实实看源码定位了。

    使用tcpdump确认,代码这边的TcpConnection确实是已经向rabbit-server发出了连接请求。

    开始观察发现三次握手是已经建立了连接的,但是几秒后,rabbit-server主动发送返回了一个RST包。这非常诧异,查看rabbit-server日志看到,产生了一次handshake_timeout错误。

    现在可以确认,不是鉴权产生的问题,而是在连接时就已经失败了,在完成连接到RST包收到刚好过了10s时间。在官方文档查阅到,rabbit-server的心跳也刚好是10s。

    后来还是确定问题点是在代码上,但是代码只有短短几行从github上copy下来的,怎么会出错呢。

    最后在日志打印上发现monitor函数执行了两次,这个小小的信息感觉看到了问题的原因,查看TcpConnection源码monitor被调用的地方。

     1 public:
     2     /**
     3      *  Constructor
     4      *  @param  connection  Parent TCP connection object
     5      *  @param  socket      The socket filedescriptor
     6      *  @param  buffer      The buffer that was already built
     7      *  @param  handler     User-supplied handler object
     8      */
     9     TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) : 
    10         TcpState(connection, handler),
    11         _socket(socket),
    12         _out(std::move(buffer)),
    13         _in(4096)
    14     {
    15         // if there is already an output buffer, we have to send out that first
    16         if (_out) _out.sendto(_socket);
    17         
    18         // tell the handler to monitor the socket, if there is an out
    19         _handler->monitor(_connection, _socket, _out ? readable | writable : readable);
    20     }
    21     
    22     /**
    23      *  Destructor
    24      */
    25     virtual ~TcpConnected() noexcept
    26     {
    27         // we no longer have to monitor the socket
    28         _handler->monitor(_connection, _socket, 0);
    29         
    30         // close the socket
    31         close(_socket);
    32     }

    在构造和析构中各调用了一次,而且内部使用connection可能是为了提高效率进行了线程操作,也就是说实际的connection是在多线程中完成的。

    最后尝试修改代码,使用指针进行操作,因为代码并不是github上的单个函数文件,而是多处引用,最后问题解决。成功使用TcpConnection连接上了rabbit-server。

    附上简单代码:

     1 int Broker::init(std::string host,int port, std::string username, std::string userpasswd, int svrid)
     2 {
     3     // create an instance of your own tcp handler
     4     _handle = new DSBrokerMessageHandle();
     5 
     6     // address of the server
     7     AMQP::Address address(host, port,AMQP::Login(username,userpasswd),"/");
     8 
     9     // create a AMQP connection object
    10     _connection = new AMQP::TcpConnection(_handle, address);
    11 
    12 // and create a channel 13 _channel = new AMQP::TcpChannel(&connection); 14 15 auto receiveMessageCallback = [=](const AMQP::Message &message, 16 uint64_t deliveryTag, 17 bool redelivered) 18 { 19 //_channel->ack(deliveryTag); 20 }; 21 22 AMQP::QueueCallback callback = 23 [=](const std::string &name, int msgcount, int consumercount) 24 { 25 _channel->bindQueue("service", name, name); 26 _channel->bindQueue("service", name, "monitor"); 27 _channel->bindQueue("service", name, "heartbeat"); 28 29 _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback); 30 }; 31 32 AMQP::SuccessCallback success = [svrid, this, callback]() 33 { 34 char que[4] = { '' }; 35 ACE_OS::itoa(svrid, que, 10); 36 std::string quename(que); 37 _channel->declareQueue(quename, AMQP::durable).onSuccess(callback); 38 }; 39 40 // use the channel object to call the AMQP method you like 41 _channel->declareExchange("service", AMQP::fanout).onSuccess(success); 42 43 return 0; 44 }
  • 相关阅读:
    javascript编程解决黑化的牛牛问题
    Eclipse中给jar包导入JavaDoc注释文档的方法
    Android网络连接处理学习笔记
    Android Animation学习笔记
    Android中动画学习
    Android 动画效果 --Animation 动画专题研究 一
    Android动画学习笔记-Android Animation
    Android 动画之RotateAnimation应用详解
    Android 动画之ScaleAnimation应用详解
    Android 动画之AlphaAnimation应用详解
  • 原文地址:https://www.cnblogs.com/binchen-china/p/5708381.html
Copyright © 2011-2022 走看看