zoukankan      html  css  js  c++  java
  • rabbitmq redis

    RabbitMQ

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。

    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列

    用rabbitmq实现一个简单的生产者消费者模型

    发送端代码

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
    channel = connection.channel()
    
    channel.queue_declare(queue="hello")
    
    channel.basic_publish(exchange='',
                         routing_key = 'hello',
                         body='hello world',
    )
    print("Send hello world")
    connection.close()

    接收端代码

     1 import pika
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
     4 channel = connection.channel()
     5 channel.queue_declare(queue="hello")
     6 
     7 def callback(ch,method,properties,body):
     8     print(ch,method,properties)
     9     print("received %s" %body)
    10 
    11 channel.basic_consume(callback,
    12                       queue='hello',
    13                       no_ack=True)
    14 
    15 print("waiting for messages to exit press 'CTRL+C'")
    16 channel.start_consuming()

    通过上述代码便可以实现一个简单的生产者消费者模型,但是现在的结果是:当开启多个消费者程序的时候,启动生产者发送消息,这个时候只有一个可以收到,并且再次启动,会下一个消费者收到,类似一个轮询的关系。

    acknowledgment 消息不丢失(通过客户端设置实现)

    通过no_ack = False参数设置,如果消费者遇到情况突然中断了没有收到,那么RabbitMQ会重新将任务添加到队列中

    下面将接收端的代码进行更改:

    #AUTHOR:FAN
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
    channel = connection.channel()
    channel.queue_declare(queue="hello")
    
    def callback(ch,method,properties,body):
        print(ch,method,properties)
        time.sleep(10)
        print("received %s" %body)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print("waiting for messages to exit press 'CTRL+C'")
    channel.start_consuming()

    标注的地方就是代码修改的地方,通过将no_ack更改为False,以及在callback回到函数这里让等待10s,这样启动接收端后,再启动发送算,在还没有打印数据的时候将客户端关闭,然后再启动,发现依然可以收到刚才发送端发送的数据。

    但是这种方式只能实现客户端断开重新连接的时候数据不丢失,如果是rabbitmq挂了的情况如何解决?

    durable消息不丢失(通过在服务端设置保证数据不丢失)

    这个时候生产者和消费者的代码都需要改动

    发送者代码

     1 import pika
     2 
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
     5 channel = connection.channel()
     6 
     7 channel.queue_declare(queue='fan',durable=True)
     8 
     9 channel.basic_publish(exchange='',
    10                       routing_key='fan',
    11                       body='hello world',
    12                       properties = pika.BasicProperties(
    13                           delivery_mode=2
    14                       ))
    15 
    16 print("send 'hello world'")
    17 connection.close()

    接收者的代码

     1 import pika
     2 import time
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
     5 channel = connection.channel()
     6 
     7 channel.queue_declare(queue='fan',durable=True)
     8 
     9 def callback(ch,method,properies,body):
    10     print("received %s" %body)
    11     time.sleep(10)
    12     print("is ok")
    13     ch.basic_ack(delivery_tag=method.delivery_tag)
    14 
    15 channel.basic_consume(callback,
    16                       queue='fan',
    17                       no_ack=False)
    18 
    19 print("waitting for messages.To exit press CTRL+C")
    20 channel.start_consuming()

    这样即使在接收者接收数据过程中rabbitmq服务器出现问题了,在服务恢复之后,依然可以收到数据

    发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    通过exchange type = fanout参数实现

    代码例子:

    发布者:

     1 #AUTHOR:FAN
     2 
     3 import pika
     4 import sys
     5 
     6 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103'))
     7 channel = connection.channel()
     8 
     9 channel.exchange_declare(exchange="fan",
    10                          type='fanout')
    11 
    12 message = ' '.join(sys.argv[1:]) or "info :hello world"
    13 channel.basic_publish(exchange = 'fan',
    14                       routing_key='',
    15                       body=message)
    16 
    17 print("send %s" %message)
    18 connection.close()

    订阅者:

    #AUTHOR:FAN
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange="fan",
                             type='fanout')
    
    #随机生成队列名字
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    #将exchange和队列绑定
    channel.queue_bind(exchange='fan',
                       queue=queue_name)
    
    print("waiting for fan ,To exit press CTRL+C")
    def callback(ch,method,proerties,body):
        print("---",body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    关键字发送

    通过参数:exchange type = direct实现

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    代码例子如下:

    消费者代码:

     1 #AUTHOR:FAN
     2 import pika
     3 import sys
     4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
     5 channel = connection.channel()
     6 channel.exchange_declare(exchange='direct_logs_1',
     7                          type='direct')
     8 result = channel.queue_declare(exclusive=True)
     9 queue_name = result.method.queue
    10 
    11 severities = sys.argv[1:]
    12 if not severities:
    13     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    14     exit(1)
    15 print(severities)
    16 for severity in severities:
    17     print(severity)
    18     channel.queue_bind(exchange='direct_logs_1',
    19     queue=queue_name,
    20     routing_key=severity)
    21 print("waiting for logs,To exit press CTRL+C")
    22 def callback(ch,method,properties,body):
    23     print("%s:%s" %(method.routing_key,body))
    24 
    25 channel.basic_consume(callback,
    26                       queue=queue_name,
    27                       no_ack=True)
    28 channel.start_consuming()

    生产者代码

     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
     5 channel = connection.channel()
     6 
     7 channel.exchange_declare(exchange='direct_logs_1',
     8                          type='direct')
     9 
    10 print(sys.argv)
    11 severity = sys.argv[1] if len(sys.argv) >1 else "error"
    12 message = ' '.join(sys.argv[2:]) or 'hello world'
    13 channel.basic_publish(exchange='direct_logs_1',
    14                       routing_key = severity,
    15                       body = message)
    16 print("send %s:%s" %(severity,message))
    17 connection.close()

    模糊匹配

    通过参数exchange type = topic实现

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    # 表示可以匹配 0 个 或 多个 单词

    *  表示只能匹配 一个 单词

    --------------------还没有整理完

  • 相关阅读:
    HTML5中meta属性的使用详解
    前端部分兼容性问题汇总
    position元素定位详述
    jquery简单实现轮播图
    事件委托-选项卡案例
    async、await
    前端会遇到的算法
    arguments实参个数
    前端知识点整理(三)
    var、let、const
  • 原文地址:https://www.cnblogs.com/zhaof/p/5983636.html
Copyright © 2011-2022 走看看