zoukankan      html  css  js  c++  java
  • python操作sqlite示例(支持多进程/线程同时操作)

    python操作sqlite的示例代码:

    import time
    import threading
    import sqlite3
    
    def nomal_producer(conn):
        '''
        @summary: producer defination
        '''
        counter = 0
        conn.isolation_level = None
        conn.row_factory = sqlite3.Row
    
        while True:
            # insert to db
            cur = conn.cursor()
            cur.execute("INSERT INTO datas(content, flag) VALUES (?, ?);", ("content %s"%counter, False))
            counter = counter + 1
            # conn.commit()
            time.sleep(0.1)
    
    def nomal_consumer(conn):
        '''
        @summary: consumer defination
        '''
        conn.isolation_level = None
        conn.row_factory = sqlite3.Row
        while True:
            # select data
            cur = conn.cursor()
            cur.execute("SELECT * FROM datas ORDER BY id LIMIT 10;")
            records = cur.fetchall()
            if len(records) > 0:
                print "begin to delete: "
                print records
                # delete records
                for r in records:
                    conn.execute("DELETE FROM datas WHERE id = ?;", (r["id"], ))
            time.sleep(0.5)
    
    if __name__ == "__main__":
        # init db
        conn = sqlite3.connect('./db.sqlite', check_same_thread = False)
        # conn = sqlite3.connect('./db.sqlite')
        # init thread
        producer = threading.Thread(target = nomal_producer, args = (conn,))
        consumer = threading.Thread(target = nomal_consumer, args = (conn,))
    
        # start threads
        producer.start()
        consumer.start()

    在多进程操作sqlite的示例代码中,采用producer和consumer的模式来处理,没有特殊之处,但需要注意的是:在建立sqlite3的connection的时候,需要设置check_same_thread = False。
    另外,为了达到真正的thread-safe,可以对python的sqlite3做进一步封装,以达到仅有一个thread在操作sqlite,原理很简单,就是使用queue来处理所有操作请求并同时将结果返回到另外一个queue中去,示例代码如下:

    import sqlite3
    from Queue import Queue
    from threading import Thread
    
    class SqliteMultithread(Thread):
        """
        Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    
        This is done by internally queueing the requests and processing them sequentially
        in a separate thread (in the same order they arrived).
    
        """
        def __init__(self, filename, autocommit, journal_mode):
            super(SqliteMultithread, self).__init__()
            self.filename = filename
            self.autocommit = autocommit
            self.journal_mode = journal_mode
            self.reqs = Queue() # use request queue of unlimited size
            self.setDaemon(True) # python2.5-compatible
            self.start()
    
        def run(self):
            if self.autocommit:
                conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
            else:
                conn = sqlite3.connect(self.filename, check_same_thread=False)
            conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
            conn.text_factory = str
            cursor = conn.cursor()
            cursor.execute('PRAGMA synchronous=OFF')
            while True:
                req, arg, res = self.reqs.get()
                if req == '--close--':
                    break
                elif req == '--commit--':
                    conn.commit()
                else:
                    cursor.execute(req, arg)
                    if res:
                        for rec in cursor:
                            res.put(rec)
                        res.put('--no more--')
                    if self.autocommit:
                        conn.commit()
            conn.close()
    
        def execute(self, req, arg=None, res=None):
            """
            `execute` calls are non-blocking: just queue up the request and return immediately.
    
            """
            self.reqs.put((req, arg or tuple(), res))
    
        def executemany(self, req, items):
            for item in items:
                self.execute(req, item)
    
        def select(self, req, arg=None):
            """
            Unlike sqlite's native select, this select doesn't handle iteration efficiently.
    
            The result of `select` starts filling up with values as soon as the
            request is dequeued, and although you can iterate over the result normally
            (`for res in self.select(): ...`), the entire result will be in memory.
    
            """
            res = Queue() # results of the select will appear as items in this queue
            self.execute(req, arg, res)
            while True:
                rec = res.get()
                if rec == '--no more--':
                    break
                yield rec
    
        def select_one(self, req, arg=None):
            """Return only the first row of the SELECT, or None if there are no matching rows."""
            try:
                return iter(self.select(req, arg)).next()
            except StopIteration:
                return None
    
        def commit(self):
            self.execute('--commit--')
    
        def close(self):
            self.execute('--close--')
    
    #endclass SqliteMultithread
  • 相关阅读:
    Python拼接字符串的7种方法
    jieba结巴分词
    nginx配置文件的性能优化
    nginx默认的配置文件详解
    CentOS怎样安装Python3.6
    Scrapy爬去哪儿~上海一日游门票并存入MongoDB数据库
    Scrapy模拟登录GitHub
    Scrapy爬豆瓣电影Top250并存入MySQL数据库
    Scrapy爬博客园
    创建第一个Scrapy项目
  • 原文地址:https://www.cnblogs.com/Jerryshome/p/2882931.html
Copyright © 2011-2022 走看看