zoukankan      html  css  js  c++  java
  • RabbitMq学习笔记——RabbitMQ C的使用

    1、需要用到的参数:

      主机名:hostname、端口号:port、交换器:exchange、路由key:routingkey 、绑定路由:bindingkey、用户名:user、密码:psw,默认用户名和密码为guest。

    2、步骤:

    1)、建立TCP连接:

      conn = amqp_new_connection();

      socket = amqp_tcp_socket_new(conn);

    2)、打开建立的TCP连接,使用socket,主机名和端口号:

      status = amqp_socket_open(socket, hostname, port);

    3)、登录

      amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");

    4)、打开信道:

      amqp_channel_open(conn, 1);

      amqp_get_rpc_reply(conn);

    5)、开始发送或者监听消息

    6)、关闭信道、关闭连接、销毁连接:

      amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

      amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

      amqp_destroy_connection(conn);

    3、发送端

      pro文件

      HEADERS +=

      includeamqp.h

      includeamqp_framing.h

      includeamqp_tcp_socket.h

      SOURCE +=

        main.cpp

      LIBS += $$PWDliblibrabbitmq.4.dll.a

    main.cpp文件

      #include <QtCore/QCoreApplication>

      #include <QDebug>

      #include "amqp.h"

      #include "amqp_framing.h"

      #include "amqp_tcp_socket.h"

      int main(int argc, char *argv[])
      {
        QCoreApplication a(argc, argv);

        const char* hostname = "localhost";

        int port = 5672;

        const char* exchange = "amq.direct";

        const char* routingkey = "test";

        const char* messagebody = "Hello World!";发送"Hello World!”

        amqp_connection_state_t  conn = amqp_new_connection();

        amqp_socket_t* socket = amqp_tcp_socket_new(conn);

        if (!socket)

        {
          qDebug() << "creating TCP socket";
        }

        int status = amqp_socket_open(socket, hostname, port);
        if (status)

        {
          qDebug() << "opening TCP socket";
        }

        amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
        amqp_channel_open(conn, 1);
        amqp_get_rpc_reply(conn);

        {
          amqp_basic_properties_t props;
          props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
          props.content_type = amqp_cstring_bytes("text/plain");
          props.delivery_mode = 2; /* persistent delivery mode */
          amqp_basic_publish(conn,1,amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey),0,0,&props,amqp_cstring_bytes(messagebody));
        }

        amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn);

        return a.exec();
      }

    4、接收端

      pro文件

      HEADERS +=

      includeamqp.h

      includeamqp_framing.h

      includeamqp_tcp_socket.h

      SOURCE +=

        main.cpp

      LIBS += $$PWDliblibrabbitmq.4.dll.a

    main.cpp文件

      int main(int argc, char *argv[])

      {

        QApplication a(argc, argv);
    int port = 5672;
    const char* hostname = "192.168.250.223";
    const char* exchange = "amq.direct";
    const char* bindingkey = "test";
    
    
        amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
        if(!socket)
    {
    qDebug()<<"creating TCP socket";
    }
    
    
        int status = amqp_socket_open(socket,hostname,port);
    if(status)
    {
    qDebug() << "opening TCP socket";
    }
        amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
    amqp_channel_open(conn,1);
    amqp_get_rpc_reply(conn);
        amqp_queue_declare_ok_t *r = amqp_queue_declare(conn,1,amqp_empty_bytes,0,0,0,1,amqp_empty_table);
    amqp_get_rpc_reply(conn);
        amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
    if(queuename.bytes == nullptr)
    {
    fprintf(stderr,"Out of memory while copying queue name");
    return 1;
    }
        amqp_queue_bind(conn,1,queuename,amqp_cstring_bytes(exchange),amqp_cstring_bytes(bindingkey),amqp_empty_table);
    amqp_get_rpc_reply(conn);
    amqp_basic_consume(conn,1,queuename,amqp_empty_bytes,0,1,0,amqp_empty_table);
    amqp_get_rpc_reply(conn);

    while(1)
    {
        amqp_maybe_release_buffers(conn);
        amqp_envelope_t envelope;
        amqp_rpc_reply_t res = amqp_consume_message(conn,&envelope,nullptr,0);
        if(AMQP_RESPONSE_NORMAL != res.reply_type)
        {
          break;
        }
        printf("Delivery %u, exchange %.*s routingkey %.*s
    ",
              static_cast<uint>(envelope.delivery_tag),
              static_cast<int>(envelope.exchange.len),
              static_cast<char*>(envelope.exchange.bytes),
              static_cast<int>(envelope.routing_key.len),
              static_cast<char*>(envelope.routing_key.bytes));
        if(envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
        {
          printf("Content-type: %.*s ",
                static_cast<int>(envelope.message.properties.content_type.len),
    static_cast<char*>(envelope.message.properties.content_type.bytes));
        }
        amqp_dump(envelope.message.body.bytes,envelope.message.body.len);
        amqp_destroy_envelope(&envelope);
    }
        amqp_channel_close(conn,1,AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn,AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    Widget w;
    w.show();
    return a.exec();
    }
  • 相关阅读:
    vmware linux虚拟机连接ip设置
    java图片转byte转string
    javaScript传递参数,参数变化问题
    path和classpath的区别
    本地jar在打包时打入到项目中去
    使用集合来排序
    Unity 学习笔记2
    Unity 学习笔记
    unity3d 基础知识点
    Unity3D中的多线程及使用多线程
  • 原文地址:https://www.cnblogs.com/zhangnianyong/p/11324932.html
Copyright © 2011-2022 走看看