zoukankan      html  css  js  c++  java
  • Python操作Rabbit MQ的5种模式

    python版本:   2.7.14

    一 消息生产者代码:

     1 # -*- coding: utf-8 -*-
     2 
     3 import json
     4 import pika
     5 import urllib
     6 import urllib2
     7 import chardet
     8 import sys
     9 import json
    10 from common import CommonMethod
    11 import pika
    12 import time
    13 
    14 HOST_NAME = "172.21.204.14"
    15 USER_NAME = "xxx"
    16 PASSWORD = "xxx"
    17 
    18 # 1."Hello World!"
    19 def hello_world():
    20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    22     channel = connection.channel()
    23     
    24     channel.queue_declare(queue='hello')
    25     channel.basic_publish(exchange='',
    26                             routing_key='hello',      # specify queue  name
    27                             body='Hello World!')
    28     print(" [x] Sent 'Hello World!'")
    29     connection.close()
    30 
    31 # 2."Work queues"
    32 def new_task():
    33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    35     channel = connection.channel()
    36 
    37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
    38     message = ' '.join(sys.argv[1:]) or "Hello World!"
    39     channel.basic_publish(exchange='',
    40                         routing_key='task_queue',
    41                         body=message,
    42                         properties=pika.BasicProperties(
    43                             delivery_mode = 2,                # 设置消息持久化
    44                         ))
    45     print(" [x] Sent %r" % message) 
    46     connection.close()
    47 
    48 # 3."Publish/Subscribe"
    49 def emit_log(message):
    50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    52     channel = connection.channel()
    53 
    54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
    55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
    56 
    57     channel.basic_publish(exchange='logs',
    58                         routing_key='',
    59                         body=message)
    60     print(" [x] Sent %r" % message)
    61     connection.close()
    62 
    63 # 4."Routing"
    64 def emit_log_direct(log_level,message):
    65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    67     channel = connection.channel()
    68 
    69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
    70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
    71 
    72     channel.basic_publish(exchange='direct_logs',
    73                         routing_key=log_level,
    74                         body=message)
    75     print(" [x] Sent %r:%r" % (log_level, message))
    76     connection.close()
    77 
    78 emit_log_direct("info", "info log message:...")
    79 emit_log_direct("error", "error log message:...")
    80 
    81 # 5."Topic"
    82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
    View Code

    二 消息消费者代码:

     1 # -*- coding: utf-8 -*-
     2 
     3 import json
     4 import pika
     5 import urllib
     6 import urllib2
     7 import chardet
     8 import sys
     9 import json
    10 from common import CommonMethod
    11 import pika
    12 import time
    13 
    14 HOST_NAME = "172.21.204.14"
    15 USER_NAME = "xxx"
    16 PASSWORD = "xxx"
    17 
    18 # 1."Hello World!"
    19 def hello_world():
    20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    22     channel = connection.channel()
    23     
    24     channel.queue_declare(queue='hello')
    25     channel.basic_publish(exchange='',
    26                             routing_key='hello',      # specify queue  name
    27                             body='Hello World!')
    28     print(" [x] Sent 'Hello World!'")
    29     connection.close()
    30 
    31 # 2."Work queues"
    32 def new_task():
    33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    35     channel = connection.channel()
    36 
    37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
    38     message = ' '.join(sys.argv[1:]) or "Hello World!"
    39     channel.basic_publish(exchange='',
    40                         routing_key='task_queue',
    41                         body=message,
    42                         properties=pika.BasicProperties(
    43                             delivery_mode = 2,                # 设置消息持久化
    44                         ))
    45     print(" [x] Sent %r" % message) 
    46     connection.close()
    47 
    48 # 3."Publish/Subscribe"
    49 def emit_log(message):
    50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    52     channel = connection.channel()
    53 
    54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
    55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
    56 
    57     channel.basic_publish(exchange='logs',
    58                         routing_key='',
    59                         body=message)
    60     print(" [x] Sent %r" % message)
    61     connection.close()
    62 
    63 # 4."Routing"
    64 def emit_log_direct(log_level,message):
    65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    67     channel = connection.channel()
    68 
    69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
    70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
    71 
    72     channel.basic_publish(exchange='direct_logs',
    73                         routing_key=log_level,
    74                         body=message)
    75     print(" [x] Sent %r:%r" % (log_level, message))
    76     connection.close()
    77 
    78 emit_log_direct("info", "info log message:...")
    79 emit_log_direct("error", "error log message:...")
    80 
    81 # 5."Topic"
    82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
    View Code

    三 图片

    官网参考文档: http://www.rabbitmq.com/getstarted.html

  • 相关阅读:
    如何成为一名专家级的开发人员
    ZapThink探讨未来十年中企业IT的若干趋势
    Adobe CTO:Android将超预期获50%份额
    我的美国之行
    用上Vista了!
    用pylint来检查python程序的潜在错误
    delegate in c++ (new version)
    The GNU Text Utilities
    python程序转为exe文件
    c++头文件,cpp文件,makefile,unit test自动生成器
  • 原文地址:https://www.cnblogs.com/miaosha5s/p/9544424.html
Copyright © 2011-2022 走看看