zoukankan      html  css  js  c++  java
  • python操作MongoDB、MySQL、Postgres、Sqlite、redis实例

    总结:除了MongoDB、redis,其他三个数据库用python来操作其实是差不多的。所有例子都很简单,实际生产环境中的数据库操作远比这复杂得多,命令也比我例子中的多得多,我这里高级一点的用法就是批量插入了。所以大型团队中都需要有一个对sql非常熟悉的人(DBA?我觉得应该不算吧,DBA可能对于大型数据公司是有必要的,对于普通产品或者团队来说,需要一个熟悉数据库的人就够了),因为每个sql其实都有很多高级技巧的,这个就不在本文范围内了。

    测试人员py操作数据库有几个目的:模拟环境、接口测试结果校验。

    这几个重点是在学习过程中需要关注的:

    1、了解python对应的数据库操作库的基础知识,比如如何参数化、如何执行sql命令之类。

    2、了解各个数据库的操作语句,其实不同数据库都是有所不同的。

    def mangodbtest():
        #创建连接,连接数据库
        connection=pymongo.Connection('172.22.31.99',27017)
        mongodb  = connection.testdb#建库
        test = mongodb.test#建表
        #---------INSERT 
        new_post = {"AccountID":22,"UserName":"libing",'date':datetime.datetime.now()}#定义成字典
        test.insert(new_post)#insert是写入
        new_posts = [{"AccountID":55,"UserName":"liuw",'date':datetime.datetime.now()},{"AccountID":66,"UserName":"urling",'date':datetime.datetime.now()}]#定义多条数据
        test.insert(new_posts)
        u1 = dict(AccountID=33,UserName="libing1",date=datetime.datetime.now())#转换成字典
        test.insert(u1)    
        u2 = dict(AccountID=44,UserName="libing3",date=datetime.datetime.now())
        test.save(u2)#save也是写入,性能稍差
        # u3 = dict((AccountID=77,UserName="libing1",date=datetime.datetime.now()),(AccountID=88,UserName="libing1",date=datetime.datetime.now()))      
        #---------DELETE 
        test.remove({"AccountID":22,"UserName":"libing"})
        #---------SELECT 
        for u in test.find({"AccountID":{"$lt":50}}): #AccountID < 50 的内容
            print u
        for u in test.find({"AccountID":33}): #AccountID =33 的内容
            print u
        for u in test.find():#所有内容
            print u
        #---------UPDATE 
        test.update({"UserName":"libing1"},{"$set":{'AccountID':random.randint(100,200)}})
        for u in test.find({"UserName":"libing1"}): #AccountID =33 的内容
            print u
    
        test.update({}, {'$set' :{'AccountID':random.randint(100,200)}}, upsert=False, multi=True)#同时更新多条数据 
        for u in test.find():
            print u    
        #---------批量增
        u3 = []
        u3.append( {"AccountID":77,"UserName":"libing1",'date':datetime.datetime.now()})
        u3.append( {"AccountID":88,"UserName":"libing1",'date':datetime.datetime.now()})
        u3.append( {"AccountID":99,"UserName":"libing1",'date':datetime.datetime.now()})
        test.insert(u3) 
        for u in test.find({"UserName":"libing1"}): #AccountID =33 的内容
            print u
        #---------删表
        test.drop()
        #---------关闭连接      
    
    def mysqltest():
        #---------连接
        mysql = MySQLdb.connect(host='172.22.31.85', user='root',passwd='admin888',db='test')
        mysqlcur = mysql.cursor()
        #---------删表
        mysqlcur.execute('''drop TABLE ntf''')
        mysql.commit()
        #---------建表
        mysqlcur.execute('''CREATE TABLE ntf
           (md5           char(32) PRIMARY KEY     NOT NULL,
           le             TEXT    NOT NULL,
           nid            INT     NOT NULL,
           ADDRESS        varchar(50),
           SALARY         REAL);''')
        mysql.commit()
        #---------INSERT 
        mysqlcur.execute('''insert into ntf (md5,le,nid) values ('aa','aa','1')''')
        mysqlcur.execute('''insert into ntf (md5,le,nid) values ('bb','bb','1')''')
        mysqlcur.execute('''insert into ntf (md5,le,nid) values ('bb2','bb','2')''')
        mysql.commit()
        #---------DELETE 
        mysqlcur.execute('''delete from ntf where md5='aa';''')
        mysql.commit()
        #---------SELECT  
        sql=mysqlcur.execute('''SELECT * from ntf where le='bb' ''')
        one = mysqlcur.fetchone()
        print one
        all = mysqlcur.fetchall()
        print all
        #---------UPDATE 
        mysqlcur.execute('''update ntf set md5='cc' where md5='bb' ''')
        mysql.commit()
        #---------批量增 
        count = 0
        strSql = ""
        head = '''insert into ntf(md5,le,nid) values '''
        strSql = head
        for i in range(2887122945,2887188479,1):        
            #如果满足1000条数据则执行sql
            if  (count != 0) and (count% 100 == 0):
                print count
                strSql = strSql.rstrip(',')
                mysqlcur.execute(strSql)
                mysql.commit()
                # print 'cost %s ' %(time.time() - begin)
                #还原sql语句前部分
                strSql = head            
            #不满足1000条数据,把后部的value数据接驳在最后
            md5 = 'test' + hashlib.md5(str(i)).hexdigest()[0:6]
            le = hashlib.md5(str(i)).hexdigest()[0:6]
            nid = random.choice((2,3,4,5,6))
            strSql += '''('%s','%s',%s),'''% (md5,le,nid)
            count += 1            
            #如果超过1000条数据就不执行了
            if count > 1000:
                break
        #---------关闭连接       
        mysqlcur.close()
        mysql.close() 
        
    def postgrestest():
        #---------连接
        pg = psycopg2.connect(host='172.22.31.40',port='5360', user='postgres',password='postgres',dbname='skylar')
        pgcur = pg.cursor()
        #---------删表
        pgcur.execute('''DROP TABLE ntf''')
        pg.commit()
        #---------建表
        pgcur.execute('''CREATE TABLE ntf
           (md5           char(32) PRIMARY KEY     NOT NULL,
           le             TEXT    NOT NULL,
           nid            INT     NOT NULL,
           ADDRESS        varchar(50),
           SALARY         REAL);''')
        pg.commit()
        #---------INSERT 
        pgcur.execute('''insert into ntf (md5,le,nid) values ('aa','aa','1')''')
        pgcur.execute('''insert into ntf (md5,le,nid) values ('bb','bb','1')''')
        pgcur.execute('''insert into ntf (md5,le,nid) values ('bb2','bb','2')''')
        pg.commit()
        #---------DELETE 
        pgcur.execute('''delete from ntf where md5='aa';''')
        pg.commit()
        #---------SELECT  
        sql=pgcur.execute('''SELECT * from ntf where le='bb' ''')
        one = pgcur.fetchone()
        print one
        all = pgcur.fetchall()
        print all
        #---------UPDATE 
        pgcur.execute('''update ntf set md5='cc' where md5='bb' ''')
        pg.commit()
        #---------批量增   
        count = 0
        strSql = ""
        head = '''insert into ntf(md5,le,nid) values '''
        strSql = head
        for i in range(2887122945,2887188479,1):        
            #如果满足1000条数据则执行sql
            if  (count != 0) and (count% 100 == 0):
                print count
                strSql = strSql.rstrip(',')
                pgcur.execute(strSql)
                pg.commit()
                # print 'cost %s ' %(time.time() - begin)
                #还原sql语句前部分
                strSql = head            
            #不满足1000条数据,把后部的value数据接驳在最后
            md5 = 'test' + hashlib.md5(str(i)).hexdigest()[0:6]
            le = hashlib.md5(str(i)).hexdigest()[0:6]
            nid = random.choice((2,3,4,5,6))
            strSql += '''('%s','%s',%s),'''% (md5,le,nid)
            count += 1            
            #如果超过1000条数据就不执行了
            if count > 1000:
                break
        #---------关闭连接       
        pgcur.close()
        pg.close()  
        
    def sqlitetest():
        #---------连接
        sqlite = sqlite3.connect(os.path.dirname(os.path.abspath(sys.argv[0]))+r"sqlite.db") 
        sqlitecursor = sqlite.cursor()
        #---------删表
        sqlitecursor.execute('''DROP TABLE ntf''')#用三引号而不是双引号或者单引号,原因是sql语句中可能会出现双引号或者单引号,为了避免混淆,建议用三引号
        sqlitecursor.execute('''VACUUM''')
        sqlite.commit()
        #---------建表
        sqlite.execute('''CREATE TABLE ntf
           (md5           char(32) PRIMARY KEY     NOT NULL,
           le             TEXT    NOT NULL,
           nid            INT     NOT NULL,
           ADDRESS        varchar(50),
           SALARY         REAL);''')
        sqlitecursor.execute('''VACUUM''')
        sqlite.commit()
        #---------INSERT 
        sqlitecursor.execute('''insert into ntf (md5,le,nid) values ('aa','aa','1')''')
        sqlitecursor.execute('''insert into ntf (md5,le,nid) values ('bb','bb','1')''')
        sqlitecursor.execute('''insert into ntf (md5,le,nid) values ('bb2','bb','2')''')
        ##压缩数据库
        sqlitecursor.execute("VACUUM")
        sqlite.commit()
        #---------DELETE 
        sqlitecursor.execute('''delete from ntf where md5='aa';''')#单引号最后记得加个空格或者分号,防止和三引号混淆
        sqlitecursor.execute('''VACUUM''')
        sqlite.commit()
        #---------SELECT  
        sql=sqlitecursor.execute('''SELECT * from ntf where le='bb' ''')
        one = sqlitecursor.fetchone()
        print one
        all = sqlitecursor.fetchall()#该例程获取查询结果集中所有(如果用过fetchone,则返回剩余)的行,返回一个列表。当没有可用的行时,则返回一个空的列表。
        print all
        #---------UPDATE 
        sqlitecursor.execute('''update ntf set md5='cc' where md5='bb' ''')
        sqlitecursor.execute('''VACUUM''')
        sqlite.commit()
        sqlitecursor.close()
        sqlite.close()    
        #---------批量增,把上面的sqlite.commit()、sqlite.close()去掉保留最后一份,也可以看成是批量操作 
    def redistest(): #---------连接 r=redis.StrictRedis(host='172.22.31.40',port=6379,db=15) #--------清空表 r.flushdb() #---------INSERT #操作string类型key r.set('teststring', '111')#set key values r.setnx('teststring', '222')#增加时如果 key 已经存在,返回 0 r.setex('teststring1', '100', '200')#增加时设置ttl,单位是秒(有效期)#set key ttl values 等效于 r.set('teststring1', '200') r.expire('teststring1','300') #操作hashes 类型key r.hset('testhash', 'email', 'xxx@gmail.com') r.hset('testhash', 'phone', '123456') r.hset('testhash', 'address', 'xxxx') #同时设置多条内容 r.hmset("testhash1", {"email":"xxx@gmail.com", "phone":"123456", "address":"xxxx"}) r.hsetnx ('testhash', 'email', 'yyy@gmail.com')#增加时如果 key 已经存在则失败,返回 0 r.expire('testhash','500')#设置key过期时间 #操作list类型 r.lpush('listtest', '111111') r.lpushx('listtest', '222222')#lpush在最前面添加元素,x则表示如果listtest存在才进行插入操作 r.lpushx('notexitlist', '333333') r.lrange("listtest", 0, 3) r.rpush('listtest2', '111111') r.rpushx('listtest2', '222222')#rpush在后面添加元素 #---------DELETE r.delete('teststring') #---------UPDATE r.set('teststring', 111) r.set('teststring', 333) #---------SELECT print r.mget('teststring')#返回stringkey内容 print r.hkeys('testhash')#返回返回单条hashkey集合 print r.hget('testhash',"email")#返回单条指定的value print r.hgetall("testhash")#返回所有hashkey内容,包括key、value print r.hmget("testhash1", ("email", "phone"))#返回指定的value #配置redis print r.config_get("maxmemory") print r.config_get("timeout") # r.config_set("timeout",1)

      

  • 相关阅读:
    Java类型转换.
    搭建jenkins集群node结点
    java Lambda
    @Autowired使用说明
    Disruptor底层实现讲解与RingBuffer数据结构讲解
    Disruptor并发框架简介
    并发编程中的读写锁分离锁的使用
    并发编程过程中的重入锁
    互联网进行限流策略的Semaphore信号量使用
    并发编程中Future和Callable使用
  • 原文地址:https://www.cnblogs.com/idbeta/p/5209522.html
Copyright © 2011-2022 走看看