zoukankan      html  css  js  c++  java
  • 生产者&消费者.py

    1、最简单的 --生产者消费者

    send.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'
    """
    生产者/发送消息方
    """
    import pika

    # 远程主机的RabbitMQ Server设置的用户名密码
    credentials = pika.PlainCredentials('admin', 'admin123')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.102', 5672, '/', credentials))

    """
    A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts?
    Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety.
    So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc.
    is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named “/”.
    If that’s all you need, you’re ready to roll.

    virtualHost is used as a namespace
    for AMQP resources (default is "/"),so different applications could use multiple virtual hosts on the same AMQP server

    [root@localhost ~]# rabbitmqctl list_permissions
    Listing permissions in vhost "/" ...
    admin . . .
    guest .* .* .*
    ...done.


    """
    # 创建通道
    channel = connection.channel()

    # 声明队列hello,RabbitMQ的消息队列机制如果队列不存在那么数据将会被丢掉,下面我们声明一个队列如果不存在创建
    channel.queue_declare(queue='hello')

    # 给队列中添加消息
    channel.publish(exchange="",
    routing_key="hello",
    body="Hello World")
    print("向队列hello添加数据结束")

    # 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭通道
    channel.close()

    receive.py

    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'

    """
    消费者/接收消息方
    """

    import pika

    # 远程主机的RabbitMQ Server设置的用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials))

    # 创建通道
    channel = connection.channel()

    # 声明队列
    channel.queue_declare(queue='hello')


    # 订阅的回调函数这个订阅回调函数是由pika库来调用的
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

    # 定义通道消费者参数
    channel.basic_consume(callback,
    queue='hello',
    no_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
    channel.start_consuming()

  • 相关阅读:
    mybatis:"configuration" must match "(properties?,settings?,typeAliase.....
    Mybatis插件原理和PageHelper结合实战分页插件(七)
    mybatis结合redis实战二级缓存(六)
    Mybatis源码分析之Cache二级缓存原理 (五)
    Mybatis源码分析之Cache一级缓存原理(四)
    Mybatis源码分析之Mapper执行SQL过程(三)
    Mybatis源码分析之SqlSession和Excutor(二)
    Mybatis源码分析之SqlSessionFactory(一)
    spring结合mybatis实现数据库读写分离
    通过项目逐步深入了解Mybatis<四>
  • 原文地址:https://www.cnblogs.com/luoyan01/p/9734125.html
Copyright © 2011-2022 走看看