zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(二):”Hello, World“

     本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据”Hello, World“。

         首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列。它实现的功能”非常简单“:从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。

         和任何的Hello world一样,它们都不复杂。我们将会设计两个程序,一个发送Hello world,另一个接收这个数据并且打印到屏幕。
          整体的设计如下图:

    在这里我们将使用pika. 可以通过 pip 包管理工具来安装:

    pip install pika==0.9.8  
    

      

    1 sending

    第一个program send.py:发送Hello world 到queue。正如我们在上篇文章提到的,你程序的第一句话就是建立连接,第二句话就是创建channel:

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
    

    创建连接传入的参数就是RabbitMQ Server的ip或者name。

    关于谁创建queue,上篇文章也讨论过:Producer和Consumer都应该去创建。

    接下来我们创建名字为hello的queue:

    channel.queue_declare(queue='hello')
    

    现在我们已经准备好了发送了。
    从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print " [x] Sent 'Hello World!'"
    

      退出前别忘了关闭connection。

    connection.close()
    

      

    2. Receiving

    第二个program receive.py 将从queue中获取Message并且打印到屏幕。

    第一步还是创建connection。第二步创建channel。第三步创建queue,name = hello:

    channel.queue_declare(queue='hello')
    

      

    接下来要subscribe(订阅)了。在这之前,需要声明一个回调函数来处理接收到的数据。

    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
    

    subscribe:

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    

    最后,准备好无限循环监听吧:

    print ' [*] Waiting for messages. To exit press CTRL+C'
    channel.start_consuming()
    

      

    3 最终版本

    send.py

    root@ansible:~/workspace/rabbitmq/first_Hello# cat send.py 
    # coding:utf-8
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World.')
    
    print " [x] Send 'Hello World.'"
    
    connection.close()

     recv.py

    root@ansible:~/workspace/rabbitmq/first_Hello# cat recv.py 
    # coding:utf-8
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    """回调函数
    # 定义函数
    def a(arg):
        print arg
    
    # 定义回调函数
    def callback(arg,func):
        return func(arg)
    
    # 调用回调函数
    callback(3,a)  结果是3
    """
    # 定义回调函数
    def callback(ch,method,properties,body):
        print "[x] received %r" %(body)
    
    # 调用回调函数
    channel.basic_consume(callback,queue='hello',no_ack=True)
    
    # 循环监听
    print "[*] Waiting for message"
    channel.start_consuming()
    

      

     5 回调函数

    1.装饰器

    装饰器用来实现一种切面功能,即一些函数在调用前都必须实现的功能,比如用户是否登录,用户是否有权限这类需求,都很容易由装饰器来实现。

    import functools
    
    def log(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            print('call %s():' % func.__name__)
            return func(*args, **kw)
        return wrapper
    
    @log
    def now():
        print('2015-3-25')
    
    给函数now定义了一个装饰器log,实现功能:在调用函数之前,打印出函数的名字
    
    最终的输出是:
    >>> now()
    call now():
    2015-3-25
    

      

    2.回调函数

    回调函数就是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数。回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。

    个人理解的回调函数类似于这样一种情况,产品经理需要实现某种功能,需要找到开发,开发说我可以帮你写个函数实现这个功能,但是功能有点复杂,在不同的情况下需要传入不同的参数,这个参数是需要你来给我的。

    一般应用于对应某一事件触发的函数。比方要实现爬虫,我可以帮你写个爬虫函数,但是你首先得知道要爬虫网站的URL,大概就是这样一个意思。

    回调函数丰富了函数的调用方法,给开发带来很多方便。

    """回调函数
    # 定义函数
    def a(arg):
        print arg
    
    # 定义回调函数
    def callback(arg,func):
        return func(arg)
    
    # 调用回调函数
    callback(3,a)  结果是3
    """
    

    我的理解就是通过callback函数来调用a函数。

    那些你现在不懂的问题,看着看着突然有一天你会发现,你懂了,并且就应该是这样的,到底是领悟了,还是真的被洗脑了。

  • 相关阅读:
    如何用命令将本地项目上传到github
    Mysql基本命令一
    Mysql基本命令二
    PDO操作数据库
    PHP分页
    JQuery中$.ajax()方法参数详解
    基于jquery的has()方法以及与find()方法以及filter()方法的区别详解
    IE浏览器兼容问题
    购物车的实现方式
    JS学习之路
  • 原文地址:https://www.cnblogs.com/wanstack/p/8857390.html
Copyright © 2011-2022 走看看