zoukankan      html  css  js  c++  java
  • Python守护进程(多线程开发)

    本段代码主要作用是httpsqs队列的消费端守护进程,从httpsqs中取出数据,放入mongodb

    #!/usr/bin/python
    
    import sys,time,json,logging
    import Queue, threading, datetime
    from lib.base.daemon import Daemon
    
    from lib.queue.httpsqs.HttpsqsClient import HttpsqsClient
    from lib.db.DbMongodb import DbMongodb
    
    logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
                    filename='myapp.log',
                    filemode='w')
                    
    queue = Queue.Queue()       
    httpsqs = HttpsqsClient('192.168.0.218','1218','httpsqs.com')
    db = DbMongodb('192.168.0.119','testdb')
    
            
    class ThreadGetHttpSqs(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.httpsqs = httpsqs
            self.queue = queue
        
        def run(self):
            while True:
                data = self.httpsqs.get('logtest')
                if data is not None:
                    self.queue.put(data)
                    logging.info('get:id %s , tablename %s' % (self.getName(),data))
                else:
                    time.sleep(3)
                
                
                
    class ThreadInsertDB(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.queue = queue
            self.db = db
            
        def run(self):
            while True:
                chunk = self.queue.get()
                s = json.loads(chunk)
                tablename = s['table']
                data = s['data']
                self.db.save(tablename,data)
                logging.info('insert:id %s , tablename %s' % (self.getName(),tablename))
                self.queue.task_done()
                
    
    
    
    class MyDaemon(Daemon):
        def _run(self):
            while True:
                for i in range(2):
                    t = ThreadGetHttpSqs()
                    #t.setDaemon(True)
                    t.start()
                
                for i in range(2):
                    b = ThreadInsertDB()
                    #t.setDaemon(True)
                    b.start()
                #线程已经为永真循环,进程不能再循环
                time.wait()
                
                    
                   
    if __name__ == "__main__":
        daemon = MyDaemon('/tmp/daemon-example.pid')
        if len(sys.argv) == 2:
            if 'start' == sys.argv[1]:
                daemon.start()
            elif 'stop' == sys.argv[1]:
                daemon.stop()
            elif 'restart' == sys.argv[1]:
                daemon.restart()
            else:
                print "Unknown command"
                sys.exit(2)
            sys.exit(0)
        else:
            print "usage: %s start|stop|restart" % sys.argv[0]
            sys.exit(2)
  • 相关阅读:
    第01组 团队项目-需求分析报告
    团队项目-选题报告
    第08组 Beta冲刺(4/5)
    第08组 Beta冲刺(3/5)
    第08组 Beta冲刺(2/5)
    第08组 Beta冲刺(1/5)
    第08组 Alpha事后诸葛亮
    第08组 Alpha冲刺(6/6)
    第08组 Alpha冲刺(5/6)
    第08组 Alpha冲刺(4/6)
  • 原文地址:https://www.cnblogs.com/seans/p/3820439.html
Copyright © 2011-2022 走看看