zoukankan      html  css  js  c++  java
  • Python队列及在微信机器人中的应用

    本文来源于i春秋学院,未经允许严禁转载。

    最近打算更新微信机器人,发现机器人的作者将代码改进了很多,但去掉了sqlite数据库,需要自己根据需求设计数据库,跟作者沟通得到的建议是为了防止消息并发导致数据库死锁,建议另开一个进程读写数据库,将消息加入一个队列中,因为对Python了解有限,队列和多线程更不是我擅长的内容,于是最近疯狂Google、百度,探索着实现了此功能。写此文记录下基本概念和实现方法

    0x00 Python队列
    队列是线程中交换数据的形式。
    创建一个队列对象

    import Queue
    
    q = Queue.Queue(maxsize = 10) #maxsize是队列长度,不限制长度可以不不赋值


    将一个值放入队列

    q.put(10)


    将一个值从队列取出

    q.get()


    三种队列及构造函数

    • FIFO队列先进先出: class Queue.Queue(maxsize)
    • LIFO类似于堆,即先进后出: class Queue.LifoQueue(maxsize)
    • 优先级队列: class Queue.PriorityQueue(maxsize)
       

    队列的常用方法

    q.qsize() #返回队列的大小
    
    q.empty() #如果队列为空,返回True,反之False
    
    q.full() #如果队列满了,返回True,反之False
    
    q.full #与 maxsize 大小对应
    
    q.get([block[, timeout]]) #获取队列,block:是否阻塞等待,timeout等待时间
    
    q.get_nowait() #相当q.get(False)
    
    q.put(item[, block[, timeout]) #非阻塞写入队列,block:是否阻塞等待,timeout等待时间
    
    q.put_nowait(item) #相当q.put(item, False)
    
    q.task_done() #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    
    q.join() #等到队列为空,再执行别的操作


    几个例子

    一个线程往队列里写入随机数,另一个线程从队列里取数字(阻塞等待)

    #!/usr/bin/env python
    
    #coding:utf8
    
    import random,threading,time
    
    from Queue import Queue
    
    #Producer thread
    
    class Producer(threading.Thread):
    
        def __init__(self, t_name, queue):
    
            threading.Thread.__init__(self,name=t_name)
    
            self.data=queue
    
        def run(self):
    
            for i in range(10):  #随机产生10个数字 ,可以修改为任意大小
    
                randomnum=random.randint(1,99)
    
                print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum)
    
                self.data.put(randomnum)  #将数据依次存入队列
    
                time.sleep(1)
    
            print "%s: %s finished!" %(time.ctime(), self.getName())
    
    
    
    #Consumer thread
    
    class Consumer_all(threading.Thread):
    
        def __init__(self, t_name, queue):
    
            threading.Thread.__init__(self, name=t_name)
    
            self.data = queue
    
    
    
        def run(self):
    
            while 1:
    
                try:
    
                    # print self.data
    
                    val_even = self.data.get() 
    
                    print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even)
    
                    time.sleep(1)
    
                except  Exception, e: 
    
                    print '取数据失败'
    
                    continue
    
    
    
    #Main thread
    
    def main():
    
        queue = Queue()
    
        producer = Producer('Pro.', queue)
    
        consumer_all = Consumer_all('Con_all.', queue)
    
        producer.start()
    
        consumer_all.start()
    
    
    
    if __name__ == '__main__':
    
        main()
    运行结果
    
    Wed Aug  3 00:04:18 2016: Pro. 生成了一个数字 74 并把它扔进了队列!
    
    Wed Aug  3 00:04:18 2016: Con_all. 从队列里取出了 74 !
    
    Wed Aug  3 00:04:19 2016: Pro. 生成了一个数字 77 并把它扔进了队列!
    
    Wed Aug  3 00:04:19 2016: Con_all. 从队列里取出了 77 !
    
    Wed Aug  3 00:04:20 2016: Pro. 生成了一个数字 63 并把它扔进了队列!
    
    Wed Aug  3 00:04:20 2016: Con_all. 从队列里取出了 63 !
    
    Wed Aug  3 00:04:21 2016: Pro. 生成了一个数字 96 并把它扔进了队列!
    
    Wed Aug  3 00:04:21 2016: Con_all. 从队列里取出了 96 !
    
    Wed Aug  3 00:04:22 2016: Pro. 生成了一个数字 82 并把它扔进了队列!
    
    Wed Aug  3 00:04:22 2016: Con_all. 从队列里取出了 82 !
    
    Wed Aug  3 00:04:23 2016: Pro. 生成了一个数字 19 并把它扔进了队列!
    
    Wed Aug  3 00:04:23 2016: Con_all. 从队列里取出了 19 !
    
    Wed Aug  3 00:04:24 2016: Pro. 生成了一个数字 56 并把它扔进了队列!
    
    Wed Aug  3 00:04:24 2016: Con_all. 从队列里取出了 56 !
    
    Wed Aug  3 00:04:25 2016: Pro. 生成了一个数字 57 并把它扔进了队列!
    
    Wed Aug  3 00:04:25 2016: Con_all. 从队列里取出了 57 !
    
    Wed Aug  3 00:04:26 2016: Pro. 生成了一个数字 42 并把它扔进了队列!
    
    Wed Aug  3 00:04:26 2016: Con_all. 从队列里取出了 42 !
    
    Wed Aug  3 00:04:27 2016: Pro. 生成了一个数字 7 并把它扔进了队列!
    
    Wed Aug  3 00:04:27 2016: Con_all. 从队列里取出了 7 !
    
    Wed Aug  3 00:04:28 2016: Pro. finished!

    如果将入队的时间间隔修改,出队的程序将阻塞运行,将“ time.sleep(1)”改为“ time.sleep(10)”,再次运行

    Wed Aug  3 00:10:46 2016: Pro. 生成了一个数字 45 并把它扔进了队列!
    
    Wed Aug  3 00:10:46 2016: Con_all. 从队列里取出了 45 !
    
    Wed Aug  3 00:10:56 2016: Pro. 生成了一个数字 75 并把它扔进了队列!
    
    Wed Aug  3 00:10:56 2016: Con_all. 从队列里取出了 75 !
    
    Wed Aug  3 00:11:06 2016: Pro. 生成了一个数字 26 并把它扔进了队列!
    
    Wed Aug  3 00:11:06 2016: Con_all. 从队列里取出了 26 !
    
    Wed Aug  3 00:11:16 2016: Pro. 生成了一个数字 67 并把它扔进了队列!
    
    Wed Aug  3 00:11:16 2016: Con_all. 从队列里取出了 67 !
    
    Wed Aug  3 00:11:26 2016: Pro. 生成了一个数字 60 并把它扔进了队列!
    
    Wed Aug  3 00:11:26 2016: Con_all. 从队列里取出了 60 !
    
    Wed Aug  3 00:11:36 2016: Pro. 生成了一个数字 83 并把它扔进了队列!
    
    Wed Aug  3 00:11:36 2016: Con_all. 从队列里取出了 83 !
    
    Wed Aug  3 00:11:46 2016: Pro. 生成了一个数字 33 并把它扔进了队列!
    
    Wed Aug  3 00:11:46 2016: Con_all. 从队列里取出了 33 !
    
    Wed Aug  3 00:11:56 2016: Pro. 生成了一个数字 65 并把它扔进了队列!
    
    Wed Aug  3 00:11:56 2016: Con_all. 从队列里取出了 65 !
    
    Wed Aug  3 00:12:06 2016: Pro. 生成了一个数字 44 并把它扔进了队列!
    
    Wed Aug  3 00:12:06 2016: Con_all. 从队列里取出了 44 !
    
    Wed Aug  3 00:12:16 2016: Pro. 生成了一个数字 28 并把它扔进了队列!
    
    Wed Aug  3 00:12:16 2016: Con_all. 从队列里取出了 28 !
    
    Wed Aug  3 00:12:26 2016: Pro. finished!

    会发现当队列没有值的时候程序会阻塞等待队列有值才继续运行。
    稍微修改程序,一个线程往队列里写入随机数,另一个线程从队列里取数字(非阻塞等待)

    #!/usr/bin/env python
    
    #coding:utf8
    
    import random,threading,time
    
    from Queue import Queue
    
    #Producer thread
    
    class Producer(threading.Thread):
    
        def __init__(self, t_name, queue):
    
            threading.Thread.__init__(self,name=t_name)
    
            self.data=queue
    
        def run(self):
    
            for i in range(10):  #随机产生10个数字 ,可以修改为任意大小
    
                randomnum=random.randint(1,99)
    
                print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum)
    
                self.data.put(randomnum)  #将数据依次存入队列
    
                time.sleep(10)
    
            print "%s: %s finished!" %(time.ctime(), self.getName())
    
    
    
    #Consumer thread
    
    class Consumer_all(threading.Thread):
    
        def __init__(self, t_name, queue):
    
            threading.Thread.__init__(self, name=t_name)
    
            self.data = queue
    
    
    
        def run(self):
    
            while 1:
    
                try:
    
                    # print self.data
    
                    val_even = self.data.get(1,5)  # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
    
                    print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even)
    
                    time.sleep(1)
    
                except  Exception, e:  # 等待输入,超过5秒  就报异常
    
                    print '取数据失败'
    
                    continue
    
    
    
    #Main thread
    
    def main():
    
        queue = Queue()
    
        producer = Producer('Pro.', queue)
    
        consumer_all = Consumer_all('Con_all.', queue)
    
        producer.start()
    
        consumer_all.start()
    
    
    
    if __name__ == '__main__':
    
        main()
    Wed Aug  3 00:20:58 2016: Pro. 生成了一个数字 57 并把它扔进了队列!
    
    Wed Aug  3 00:20:58 2016: Con_all. 从队列里取出了 57 !
    
    取数据失败
    
    Wed Aug  3 00:21:08 2016: Pro. 生成了一个数字 83 并把它扔进了队列!
    
    Wed Aug  3 00:21:08 2016: Con_all. 从队列里取出了 83 !
    
    取数据失败
    
    Wed Aug  3 00:21:18 2016: Pro. 生成了一个数字 85 并把它扔进了队列!
    
    Wed Aug  3 00:21:18 2016: Con_all. 从队列里取出了 85 !
    
    取数据失败
    
    Wed Aug  3 00:21:28 2016: Pro. 生成了一个数字 64 并把它扔进了队列!
    
    Wed Aug  3 00:21:28 2016: Con_all. 从队列里取出了 64 !
    
    取数据失败
    
    Wed Aug  3 00:21:38 2016: Pro. 生成了一个数字 23 并把它扔进了队列!
    
    Wed Aug  3 00:21:38 2016: Con_all. 从队列里取出了 23 !
    
    取数据失败
    
    Wed Aug  3 00:21:48 2016: Pro. 生成了一个数字 98 并把它扔进了队列!
    
    Wed Aug  3 00:21:48 2016: Con_all. 从队列里取出了 98 !
    
    取数据失败
    
    Wed Aug  3 00:21:58 2016: Pro. 生成了一个数字 12 并把它扔进了队列!
    
    Wed Aug  3 00:21:58 2016: Con_all. 从队列里取出了 12 !
    
    取数据失败
    
    Wed Aug  3 00:22:08 2016: Pro. 生成了一个数字 27 并把它扔进了队列!
    
    Wed Aug  3 00:22:08 2016: Con_all. 从队列里取出了 27 !
    
    取数据失败
    
    Wed Aug  3 00:22:18 2016: Pro. 生成了一个数字 60 并把它扔进了队列!
    
    Wed Aug  3 00:22:18 2016: Con_all. 从队列里取出了 60 !
    
    取数据失败
    
    Wed Aug  3 00:22:28 2016: Pro. 生成了一个数字 69 并把它扔进了队列!
    
    Wed Aug  3 00:22:28 2016: Con_all. 从队列里取出了 69 !
    
    取数据失败
    
    Wed Aug  3 00:22:38 2016: Pro. finished!
    
    取数据失败
    
    取数据失败
    
    取数据失败

    可以看出,取数据的线程在超过设定的等待时间后会抛出异常并继续往下执行。

    0x01 使用队列将微信机器人消息存入MongoDB

    使用上面的例子,稍作修改,将接收到的消息扔进队列,开启另一个线程取数据,取到后将消息格式化并存入数据库

    #!/usr/bin/env python
    
    #coding:utf8
    
    import threading,time,pymongo
    
    from pymongo import MongoClient
    
    from Queue import Queue
    
    #消息入队
    
    class MsgInQueue():
    
        def __init__(self, queue):
    
            threading.Thread.__init__(self)
    
            self.data=queue
    
        def putmsgqueue(self,msg):
    
            self.data.put(msg)
    
            print 'put msg in queue success:'+msg['Content']
    
    ###消息出队并存入数据库

    class MsgOutQueue2db(threading.Thread):
    
        def __init__(self, queue):
    
            threading.Thread.__init__(self)
    
            self.data = queue
    
            #建立MongoDB连接
    
            self.conn = MongoClient()
    
            #数据库
    
            self.db = self.conn.wechatRobot
    
            #数据表
    
            self.messages = self.db.messages
    
        def run(self):
    
            while 1:
    
                try:
    
                    # print self.data
    
                    #从队列里取消息
    
                    msg = self.data.get(1, 5)  # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
    
                    print "%s: %s get %s from queue !" % (time.ctime(), self.getName(), msg['Content'].encode('utf-8'))
    
                    try:
    
                        #格式化消息数据
    
                        m = dict(groupname=msg['FromUserName'].encode('utf-8'),
    
                                 time=msg['CreateTime'],
    
                                 username=msg['ActualUserName'],
    
                                 usernickname=msg['ActualNickName'].encode('utf-8'),
    
                                 message=msg['Content'].encode('utf-8'),
    
                                 messagetype=msg['MsgType']
    
                                 )
    
                        print m
    
                        #存入数据库
    
                        self.db.messages.insert(m)
    
                        time.sleep(1)
    
                    except  Exception, e:
    
                        print e
    
                        continue
    
                except  Exception, e:
    
                    continue
     

    主线程

    def complex_reply():
    
        queue = Queue()
    
        outqueue = MsgOutQueue2db(queue)#实例化出队入库类
    
        outqueue.start()#开启线程
    
    
    
        @itchat.msg_register('Text', isGroupChat = True)
    
        def text_reply(msg):
    
            # print itchat.__client.storageClass.groupDict
    
            print itchat.__client.storageClass.chatroomList
    
            print msg
    
            inqueue=MsgInQueue(queue)#实例化入队类
    
            inqueue.putmsgqueue(msg)#消息入队
    
            if msg['isAt']:
    
                print msg
    
                itchat.send(u'@%su2005I received: %s'%(msg['ActualNickName'], msg['Content']), msg['FromUserName'])

    消息存入数据库:

    通过多线程和MongoDB的结合,有效防止消息过多导致数据库死锁的问题,也更加模块化,可以根据真实需求更换其他数据库。后面我将结合MongoDB插入、更新数据快的特点写一下我如何设计群聊统计功能,在Python方面和MongoDB方面我都是小白,如有更好的建议请多指教,我们共同学习有关Python多线程的课程请参考《python安全编程入门》

  • 相关阅读:
    C++——模板、数组类
    JSP教程(四)—— JSP内置对象(上)
    JSP教程(三)—— 基本语法
    JSP教程(二)—— JavaWeb简介
    JSP教程(一)—— JavaWeb开发环境
    关于余弦定理的一些见解
    文本相似度比较(网页版)
    关于SimHash算法的实现及测试V4.0
    关于SimHash算法的实现及测试V3.0
    关于SimHash算法的实现及测试V2.0
  • 原文地址:https://www.cnblogs.com/ichunqiu/p/6752239.html
Copyright © 2011-2022 走看看