zoukankan      html  css  js  c++  java
  • rabbitmq-c初探

      RabbitMQ着实是个好东西,当然了也有对C语言client开发的支持。例子和文档少的可怜,只能去项目里去查看example来理解,简单整理了一些,以免走些弯路。主要是在版本对应上,这点就没Maven好了,只能对好类库和例子。接下来我们简单看看需要的东东。

    环境:Ubuntu 13.04

    rabbitmq-server 默认的3.0.2-1

    librabbitmq-dev 默认的0.0.1.hg216-1

    项目构造用的qmake(这样简单不少)

    1 consumer

    1.1 consumer.pro的内容

    SOURCES=utils.cpp amqp_consumer.cpp platform_utils.cpp

    HEADERS=utils.h

    VPATH+=/usr/include

    CONFIG+=release

    TARGET=consumer

    LIBS += -lrabbitmq

    1.2 amqp_consumer.cpp代码

      这里的代码来自于rabbitmq-c-v0.3.0 具体查看 https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_consumer.c。(对于几个特殊的宏引用作了调整)

    #include <stdlib.h>

    #include <stdio.h>

    #include <string.h>

    #include <stdint.h>

    #include <amqp.h>

    #include <amqp_framing.h>

    #include <assert.h>

    #include "utils.h"

    #define SUMMARY_EVERY_US 1000000


    static void run(amqp_connection_state_t conn)

    {

      uint64_t start_time = now_microseconds();

      int received = 0;

      int previous_received = 0;

      uint64_t previous_report_time = start_time;

      uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

      amqp_frame_t frame;

      int result;

      size_t body_received;

      size_t body_target;

      uint64_t now;


      while (1) {

        now = now_microseconds();

        if (now > next_summary_time) {

          int countOverInterval = received - previous_received;

          double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

          printf("%d ms: Received %d - %d since last report (%d Hz) ",

        (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

          previous_received = received;

          previous_report_time = now;

          next_summary_time += SUMMARY_EVERY_US;

        }

        amqp_maybe_release_buffers(conn);

        result = amqp_simple_wait_frame(conn, &frame);

        if (result < 0)

          return;

        if (frame.frame_type != AMQP_FRAME_METHOD)

          continue;

        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)

          continue;


        result = amqp_simple_wait_frame(conn, &frame);

        if (result < 0)

          return;

        if (frame.frame_type != AMQP_FRAME_HEADER) {

          fprintf(stderr, "Expected header!");

          abort();

        }

        body_target = frame.payload.properties.body_size;

        body_received = 0;


        while (body_received < body_target) {

          result = amqp_simple_wait_frame(conn, &frame);

          if (result < 0)

            return;

         if (frame.frame_type != AMQP_FRAME_BODY) {

            fprintf(stderr, "Expected body!");

            abort();

          }

          body_received += frame.payload.body_fragment.len;

          assert(body_received <= body_target);

          amqp_dump(frame.payload.body_fragment.bytes,frame.payload.body_fragment.len);

        }

        received++;

      }

    }


    int main(int argc, char const * const *argv) {

      char const *hostname;

      int port;

      char const *exchange;

      char const *bindingkey;

      int sockfd;

      amqp_connection_state_t conn;

      amqp_bytes_t queuename;


      if (argc < 3) {

        fprintf(stderr, "Usage: amqp_consumer host port ");

        return 1;

      }

      hostname = argv[1];

      port = atoi(argv[2]);

      exchange = "amq.direct"; /* argv[3]; */

      bindingkey = "test queue"; /* argv[4]; */

      conn = amqp_new_connection();

      die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

      amqp_set_sockfd(conn, sockfd);

      die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

       "Logging in");

      amqp_channel_open(conn, 1);

      die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

      {

        amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 0, 0, 1,

       AMQP_EMPTY_TABLE/*amqp_empty_table*/);

        die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

        queuename = amqp_bytes_malloc_dup(r->queue);

        if (queuename.bytes == NULL) {

          fprintf(stderr, "Out of memory while copying queue name");

          return 1;

        }

      }

      amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

     AMQP_EMPTY_TABLE/*amqp_empty_table*/);

      die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

      amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 1, 0, AMQP_EMPTY_TABLE/*amqp_empty_table*/);

      die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

      run(conn);

      die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

      die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

      die_on_error(amqp_destroy_connection(conn), "Ending connection");

      return 0;

    }

    2 producer

    2.1 producer.pro的内容

    SOURCES=utils.cpp amqp_producer.cpp platform_utils.cpp
    HEADERS=utils.h
    VPATH+=/usr/include 
    CONFIG+=release
    TARGET=producer
    LIBS += -lrabbitmq

    2.2 amqp_producer.cpp代码

      这里的代码来自于rabbitmq-c-v0.3.0 具体查看https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_producer.c。(对于几个特殊的宏引用作了调整)

    #include <stdlib.h>

    #include <stdio.h>

    #include <string.h>

    #include <stdint.h>

    #include <amqp.h>

    #include <amqp_framing.h>

    #include "utils.h"

    #define SUMMARY_EVERY_US 1000000


    static void send_batch(amqp_connection_state_t conn,

          char const *queue_name,

          int rate_limit,

          int message_count)

    {

      uint64_t start_time = now_microseconds();

      int i;

      int sent = 0;

      int previous_sent = 0;

      uint64_t previous_report_time = start_time;

      uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

      char message[256];

      amqp_bytes_t message_bytes;


      for (i = 0; i < (int)sizeof(message); i++) {

        message[i] = i & 0xff;

      }

      message_bytes.len = sizeof(message);

      message_bytes.bytes = message;

      for (i = 0; i < message_count; i++) {

        uint64_t now = now_microseconds();

        die_on_error(amqp_basic_publish(conn,1,amqp_cstring_bytes("amq.direct"),amqp_cstring_bytes(queue_name),

          0,0,NULL,message_bytes),"Publishing");

        sent++;

        if (now > next_summary_time) {

          int countOverInterval = sent - previous_sent;

          double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

          printf("%d ms: Sent %d - %d since last report (%d Hz) ",(int)(now - start_time) / 1000, sent,

             countOverInterval, (int) intervalRate);

          previous_sent = sent;

          previous_report_time = now;

          next_summary_time += SUMMARY_EVERY_US;

        }

        while (((i * 1000000.0) / (now - start_time)) > rate_limit) {

          microsleep(2000);

          now = now_microseconds();

        }

      }

      {

        uint64_t stop_time = now_microseconds();

        int total_delta = stop_time - start_time;

        printf("PRODUCER - Message count: %d ", message_count);

        printf("Total time, milliseconds: %d ", total_delta / 1000);

        printf("Overall messages-per-second: %g ", (message_count / (total_delta / 1000000.0)));

      }

    }


    int main(int argc, char const * const *argv) {

      char const *hostname;

      int port;

      int rate_limit;

      int message_count;

     int sockfd;

      amqp_connection_state_t conn;

      if (argc < 5) {

        fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count ");

        return 1;

      }

      hostname = argv[1];

      port = atoi(argv[2]);

      rate_limit = atoi(argv[3]);

      message_count = atoi(argv[4]);

      conn = amqp_new_connection();

      die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

      amqp_set_sockfd(conn, sockfd);

      die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

       "Logging in");

      amqp_channel_open(conn, 1);

      die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

      send_batch(conn, "test queue", rate_limit, message_count);

      die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

      die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

      die_on_error(amqp_destroy_connection(conn), "Ending connection");

      return 0;

    }

  • 相关阅读:
    CSS系列:长度单位&字体大小的关系em rem px
    CSS兼容性
    html5+css3
    将url的查询参数解析成字典对象
    SQL阻止保存要求重新创建表的更改 在哪里设置
    Jquery&JS简单选项卡
    块级&行内(内联)元素
    时间
    PHP 二维数组根据某个字段排序
    php 操作数组 (合并,拆分,追加,查找,删除等)
  • 原文地址:https://www.cnblogs.com/snake-hand/p/3167727.html
Copyright © 2011-2022 走看看