zoukankan      html  css  js  c++  java
  • redis keys导入导出

    #!/usr/bin/python2
    import sys
    import os
    import redis
    import time
    import datetime
    import threading
    
    string_keys=[]
    hash_keys=[]
    list_keys=[]
    set_keys=[]
    zset_keys=[]
    ttl_dict={}
    hash_mutex = threading.Lock()
    list_mutex = threading.Lock()
    set_mutex = threading.Lock()
    zset_mutex = threading.Lock()
    string_mutex = threading.Lock()
    index=0
    
    def import_string(source, dest):
        keys_count = len(string_keys)
        pipeSrc = source.pipeline(transaction=False)
        pipeDst = dest.pipeline(transaction=False)
        global index
        global string_mutex
        pipe_size=1000
    
        while 1 :
            string_mutex.acquire()
            if index >= keys_count :
                string_mutex.release()
                break
            old_index = index
            if index + pipe_size < keys_count :
                index += pipe_size
            else : 
                index = keys_count
            max_index = index
            string_mutex.release()
    
            cur_index = old_index
            while (cur_index < max_index):
                pipeSrc.get(string_keys[cur_index])
                cur_index +=1
            results=pipeSrc.execute()
            for value in results:
                pipeDst.set(string_keys[old_index], value)
                if string_keys[old_index] in ttl_dict.keys() :
                    pipeDst.expire(string_keys[old_index],ttl_dict[string_keys[old_index]]);
                old_index +=1
            pipeDst.execute()
    
    def import_hash(source, dest):
        keys_count = len(hash_keys)
        pipeSrc = source.pipeline(transaction=False)
        pipeDst = dest.pipeline(transaction=False)
        while 1:
            hash_mutex.acquire()
            if len(hash_keys) == 0 :
                hash_mutex.release()
                break;
            key=hash_keys.pop(0)
            hash_mutex.release()
            hkeys=source.hkeys(key)
            keys_count = len(hkeys)
            index=0
            pipe_size=1000
            while index < keys_count:
                old_index=index
                num=0
                while (index < keys_count) and (num < pipe_size):
                    pipeSrc.hget(key, hkeys[index])
                    index +=1
                    num +=1
                results=pipeSrc.execute()
                for value in results:
                    pipeDst.hset(key, hkeys[old_index], value)
                    old_index +=1
                pipeDst.execute()
            if key in ttl_dict.keys() :
                dest.expire(key,ttl_dict[key]);
    
    def import_set(source, dest):
        keys_count = len(set_keys)
        pipeDst = dest.pipeline(transaction=False)
        while 1:
            set_mutex.acquire()
            if len(set_keys) == 0 :
                set_mutex.release()
                break;
            key=set_keys.pop(0)
            set_mutex.release()
            sValues=source.smembers(key)
            value_count = len(sValues)
            index=0
            pipe_size=1000
            while index < value_count:
                old_index=index
                num=0
                while (index < value_count) and (num < pipe_size):
                    pipeDst.sadd(key, sValues.pop())
                    index +=1
                    num +=1
                pipeDst.execute()
            if key in ttl_dict.keys() :
                dest.expire(key,ttl_dict[key]);
    
    def import_zset(source, dest):
        keys_count = len(zset_keys)
        pipeSrc = source.pipeline(transaction=False)
        pipeDst = dest.pipeline(transaction=False)
        while 1:
            zset_mutex.acquire()
            if len(zset_keys) == 0 :
                zset_mutex.release()
                break;
            key=zset_keys.pop(0)
            zset_mutex.release()
            zset_size = source.zcard(key)
            index=0
            pipe_size=1000
            while index < zset_size:
                members = source.zrange(key, index, index+pipe_size)
                index += len(members)
                for member in members:
                    pipeSrc.zscore(key, member)
                scores = pipeSrc.execute()
                i=0
                for member in members:
                    pipeDst.zadd(key, member, scores[i])
                    i+=1
                pipeDst.execute()
            if key in ttl_dict.keys() :
                dest.expire(key,ttl_dict[key]);
    
    def import_list(source, dest):
        keys_count = len(list_keys)
        pipeDst = dest.pipeline(transaction=False)
        while 1:
            list_mutex.acquire()
            if len(list_keys) == 0 :
                list_mutex.release()
                break;
            key=list_keys.pop(0)
            list_mutex.release()
            list_size = source.llen(key)
            index=0
            pipe_size=1000
            while index < list_size:
                results = source.lrange(key, index, index+pipe_size)
                index += len(results)
                for value in results:
                    pipeDst.rpush(key, value)
                pipeDst.execute()
            if key in ttl_dict.keys() :
                dest.expire(key,ttl_dict[key]);
    
    def read_type_keys(source):
        keys=source.keys()
        keys_count = len(keys)
        pipe = source.pipeline(transaction=False)
        #for key in keys:
        index=0
        pipe_size=5000
        while index < keys_count:
            old_index=index
            num=0
            while (index < keys_count) and (num < pipe_size):
                pipe.type(keys[index])
                index +=1
                num +=1
            results=pipe.execute()
            for type in results:
                if type == "string":
                    string_keys.append(keys[old_index])
                elif type == "list":
                    list_keys.append(keys[old_index])
                elif type == "hash":
                    hash_keys.append(keys[old_index])
                elif type == "set":
                    set_keys.append(keys[old_index])
                elif type == "zset":
                    zset_keys.append(keys[old_index])
                else :
                    print keys[old_index]," is not find when TYPE"
                old_index +=1
    
        index=0
        while index < keys_count:
            old_index=index
            num=0
            while (index < keys_count) and (num < pipe_size):
                pipe.ttl(keys[index])
                index +=1
                num +=1
            results=pipe.execute()
            for ttl_val in results:
                if ttl_val != None :
                    ttl_dict[keys[old_index]]=ttl_val
                old_index +=1
    
    def import_data(source):
        dest=redis.Redis(host=DstIP,port=DstPort)
        import_string(source, dest)
        import_hash(source, dest)
        import_list(source, dest)
        import_set(source, dest)
        import_zset(source, dest)
    
    if __name__=='__main__':
        argc = len(sys.argv)
        if argc != 6:
            print "usage: %s sourceIP sourcePort destIP destPort connectionNum" % (sys.argv[0])
            exit(1)
        SrcIP = sys.argv[1]
        SrcPort = int(sys.argv[2])
        DstIP = sys.argv[3]
        DstPort = int(sys.argv[4])
        ConnNum = int(sys.argv[5])
    
        source=redis.Redis(host=SrcIP,port=SrcPort)
    
        print "Begin Read Keys"
        read_type_keys(source)
        print "String Key Count is:",len(string_keys)
        print "Set Key Count is:",len(set_keys)
        print "ZSet Key Count is:",len(zset_keys)
        print "List Key Count is:",len(list_keys)
        print "Hash Key Count is:",len(hash_keys)
    
        start=datetime.datetime.now()
        threads = []
        for i in xrange(ConnNum):
            t = threading.Thread(target=import_data,args=(source,))
            threads.append(t)
    
        for t in threads:
            t.setDaemon(True)
            t.start()
        for t in threads:
            t.join()
        stop=datetime.datetime.now()
        diff=stop-start
        print "Finish, token time:",str(diff)

    使用方法如下
    ./import_data.py sourceIP sourcePort destIP destPort connectionNum
    sourceIP:导出库ip(数据源)
    sourcePort:导出库port(数据源)
    destIP:导入库ip
    destPort:导入库Port
    connectionNum: 数据导入时使用的连接数

  • 相关阅读:
    PHP:使用Zend对源码加密、Zend Guard安装以及Zend Guard Run-time support missing的解决方法
    PHP:WampServer下如何安装多个版本的PHP、mysql、apache
    Windows7下无法打开chm(mk:@MSITStore:路径[cannot open the file mk@MSITstore:路径]),chm索引就关闭的解决办法
    C#:ListView控件如何实现点击列表头进行排序?
    C#:struct的陷阱:无法修改“xxx”的返回值,因为它不是变量
    C#:装箱和拆箱相关知识整理
    wifipineapple的evilportal
    mshta 反弹shell
    kali系统教程:创建热点
    office漏洞利用--获取shell
  • 原文地址:https://www.cnblogs.com/patrick0715/p/7821770.html
Copyright © 2011-2022 走看看