zoukankan      html  css  js  c++  java
  • Pika 连接 rabbitmq 集群

    使用 Pika 连接 rabbitmq 集群
    使用 python 编程经常会用到 pika 来向 rabbitmq 发送消息,单个 rabbitmq 节点连接比较简单,本文介绍使用 rabbitmq 集群情况下的连接方式。

    vip 连接方式
    在 client 与 rabbitmq server 之间通过 haproxy 等负载均衡来提供 vip,我使用的环境就是采用这种方式,但是遇到某一节点挂掉时再访问 vip 连接 rabbitmq 集群会连接失败,常见 log 如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>
    DEBUG:pika.adapters.select_connection:Using EPollPoller
    DEBUG:pika.callback:Added: {'callback': <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1}
    DEBUG:pika.callback:Added: {'callback': <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1}
    DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987a0>>, 'only': None, 'one_shot': False, 'arguments': None}
    DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db98758>>, 'only': None, 'one_shot': False, 'arguments': None}
    DEBUG:pika.callback:Added: {'callback': <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987e8>>, 'only': None, 'one_shot': False, 'arguments': None}
    DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f869dc0b6d0> with deadline=1538140088.706256 and callback=<bound method SelectConnection._on_connect_timer of <SelectConnection INIT socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>; now=1538140088.71; delay=0
    INFO:pika.adapters.base_connection:Pika version 0.12.0 connecting to 10.10.11.1:5672
    ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
    INFO:pika.connection:Disconnected from RabbitMQ at 10.10.11.1:5672 (-1): EOF
    ERROR:pika.connection:Incompatible Protocol Versions
    ERROR:pika.connection:Connection setup failed due to The protocol returned by the server is not supported: (-1, 'EOF')
    DEBUG:pika.callback:Processing 0:_on_connection_error
    DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987a0>> for "0:_on_connection_error"
    DEBUG:pika.callback:Processing 0:_on_connection_closed
    DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f869db987e8>> for "0:_on_connection_closed"
    DEBUG:pika.callback:Incremented callback reference counter: {'callback': <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 2}
    DEBUG:pika.callback:Incremented callback reference counter: {'callback': <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None params=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 2}
    ERROR:pika.adapters.blocking_connection:Connection open failed - The protocol returned by the server is not supported: (-1

    这个报错通常是由于网络问题导致,尝试过 Telnet 连接 vip 和端口,都正常返回,目前还未找到 pika 访问 vip 连接 rabbitmq 失败的原因,所以采用类似于 openstack 中连接 rabbitmq 的方式,配置多主机列表,建立连接池。

    配置 multiple hosts
    openstack 配置transport_url 采用 rabbitmq 集群 host 列表方式,然后在 oslo.message 中建立连接池,通过 kombu 来使用 rabbitmq。参考这种方式,用 pika 实现。
    pika 的官方文档中有示例参考 blocking_consume_recover_multiple_hosts

    实际实现的时候会抛异常,原因是传递给 pika 需要是个实例而不是列表,官网上提供的方式把 host url 参数化后直接放到列表里传给 pika 进行连接:

    1
    2
    3
    4
    5
    node1 = pika.URLParameters('amqp://node1')
    node2 = pika.URLParameters('amqp://node2')
    node3 = pika.URLParameters('amqp://node3')
    all_endpoints = [node1, node2, node3]
    connection = pika.BlockingConnection(all_endpoints)

    实际执行后报错如下:

    1
    Expected instance of Parameters, not [.........]

    github 上提交的 issue: parameters error

    目前不支持直接传入多 host url 来池化 rabbitmq 集群的连接,所以要在应用程序中单独实现。

    用一个简单 for 循环来做:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    random.shuffle(all_endpoints)
    for url in all_endpoints:
        try:
            logging.basicConfig(level=logging.DEBUG)
            connection = pika.BlockingConnection(url)
        except Exception as ex:
            print str(ex)
        else:
            break

    用 shuffle 来改变列表中的 host 顺序,可以起到负载均衡的作用。

  • 相关阅读:
    leetcode 576. Out of Boundary Paths 、688. Knight Probability in Chessboard
    leetcode 129. Sum Root to Leaf Numbers
    leetcode 542. 01 Matrix 、663. Walls and Gates(lintcode) 、773. Sliding Puzzle 、803. Shortest Distance from All Buildings
    leetcode 402. Remove K Digits 、321. Create Maximum Number
    leetcode 139. Word Break 、140. Word Break II
    leetcode 329. Longest Increasing Path in a Matrix
    leetcode 334. Increasing Triplet Subsequence
    leetcode 403. Frog Jump
    android中webView加载H5,JS不能调用问题的解决
    通过nginx中转获取不到IP的问题解决
  • 原文地址:https://www.cnblogs.com/ExMan/p/12190041.html
Copyright © 2011-2022 走看看