zoukankan      html  css  js  c++  java
  • RabbitMQ and Redis

     RabbitMQ

    先讲RabbitMQ:消息队列

    threading Queue 只用于不同线程间数据交互,不能跨进程
    进程Queue 用于父进程与子进程进行交互,或者同属于同一父
    进程下多个子进程进行交互。


    QQ与Word数据交互,可以建立一个socket,也可以将数据放进硬盘,也可以用第三方软件代理

    RabbitMQ是用erlang语言写的,所以操作系统需要安装该语言的环境,安装后有一个服务,RabbitMQ;如果是linux,可以用rabbitmq -server start 启动服务

    这个RabbitMQ有各种语言的模块,根据语言下载对应模块,安装模块有两个,先到这里下载安装这个 ,再下载安装这个

    RabbitMQ可以给很多程序发送队列,所以里面有很多队列
    C是客户端,红色的为队列;

    RabbitMQ默认端口为5672 

    简单的例子

    import pika
    
    # 相当于建立一个socket
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    
    # 声明一个管道,消息将从管道发送,管道相当于一条路
    channel = connection.channel()
    
    # 声明queue,每条路有很多车,每辆车需要有个队列运行
    channel.queue_declare(queue='hello')
    
    # 通过管道
    channel.basic_publish(exchange='',
                          routing_key='hello',#就是queue名字
                          body='Hello World!')#发送的消息
    
    print('[x] Sent "Hello World" ')
    
    # 关闭队列,不用关闭管道
    connection.close()
    producer_test
    import pika,time
    
    # 相当于建立一个socket
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    
    # 声明一个管道,消息将从管道发送,管道相当于一条路
    channel = connection.channel()
    
    
    # 这里为何又声明了队列,生产者已经声明了,我们可以确认队列已经存在,但是我们不确定是哪个先运行,所以加上这个是避免消费者先运行导致的报错了
    # 声明队列收消息
    channel.queue_declare(queue='hello')
    
    def callback(ch,method,properties,body):
        '''
            回调函数
        :param ch: 管道的内存对象地址
        :param method: 包含了发给谁的队列,一般不用
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties)
        #sleep 30秒试试
    # 假设消费者处理事件处理一半,断电了,这种情况,RabbitMQ是如何处理的?
        time.sleep(30)
        print("[x] Receive %r "% body)
    
    # 这里只是声明而已,一个语法
    channel.basic_consume(#消费消息
                        callback,#如果收到消息,就调用callback函数来处理消息
                          queue='hello',#从哪个队列收消息
                          # no_ack=True #no ackownledgement 没有确认,有这个意思是不需要对方确认,一般不写这个;
    )
    
    print('[*]')
    # 这里才开始收消息,一旦启动,一直收取下去,没有消息就卡住
    channel.start_consuming()
    consumer

    分发方式是轮番发送,意思是,一个生产者,三个消费者,按照消费者启动的顺序发送队列的数据,一个接着一个发送。

    假设消费者处理事件处理一半,断电了,这种情况,RabbitMQ是如何处理的?没写o_ack=True这个的情况,RabbitMQ生产者不会删除未处理完的队列;(因为处理完事件后,会自动发送一个确认给生产者)

    消息持续化


    windows环境下
    C:Program FilesRabbitMQ Server abbitmq_server-3.7.4sbin
    rabbitmq-server.bat #服务启动

    rabbitmqctl.bat #管理的工具,可以通过rabitmqctl.bat list_queues 命令查看当前队列的信息

    将RabbitMQ服务重启,队列也会全部清除,所以要确保队列不被删除,需要再声明队列的时候加上一句durable=True,例如
    channel.queue_declare(queue='hello',durable=True)
    这样能使得队列持续化,但是消息依然没有

    若需要消息也持续化,在生产者那边加上properties=pika.BasicProperties( delivery_mode=2,#使得消息持续化 )

    # 通过管道
    channel.basic_publish(exchange='',
      routing_key='hello',#就是queue名字
      body='Hello World!',
      properties=pika.BasicProperties(
        delivery_mode=2,#使得消息持续化
    )

    import pika
    
    # 相当于建立一个socket
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    
    # 声明一个管道,消息将从管道发送,管道相当于一条路
    channel = connection.channel()
    
    # 声明queue,每条路有很多车,每辆车需要有个队列运行
    # 将RabbitMQ服务重启,队列也会全部清除,所以要确保队列不被删除,需要再声明队列的时候加上一句durable=True
    channel.queue_declare(queue='hello',durable=True)
    
    # 通过管道
    channel.basic_publish(exchange='',
                          routing_key='hello',#就是queue名字
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2,#使得消息持续化
                          )
                          )#发送的消息
    
    print('[x] Sent "Hello World" ')
    
    # 关闭队列,不用关闭管道
    connection.close()
    product
    import pika,time
    
    # 相当于建立一个socket
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    
    # 声明一个管道,消息将从管道发送,管道相当于一条路
    channel = connection.channel()
    
    
    # 这里为何又声明了队列,生产者已经声明了,我们可以确认队列已经存在,但是我们不确定是哪个先运行,所以加上这个是避免消费者先运行导致的报错了
    # 声明队列收消息
    # 将RabbitMQ服务重启,队列也会全部清除,所以要确保队列不被删除,需要再声明队列的时候加上一句durable=True
    channel.queue_declare(queue='hello',durable=True)
    
    def callback(ch,method,properties,body):
        '''
            回调函数
        :param ch: 管道的内存对象地址
        :param method: 包含了发给谁的队列,一般不用
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties)
        #sleep 30秒试试
    # 假设消费者处理事件处理一半,断电了,这种情况,RabbitMQ是如何处理的?
        time.sleep(30)
        print("[x] Receive %r "% body)
        # 设置跟服务器确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 这里只是声明而已,一个语法
    channel.basic_consume(#消费消息
                        callback,#如果收到消息,就调用callback函数来处理消息
                          queue='hello',#从哪个队列收消息
                          # no_ack=True #no ackownledgement 没有确认,有这个意思是不需要对方确认,一般不写这个;
    )
    
    print('[*]')
    # 这里才开始收消息,一旦启动,一直收取下去,没有消息就卡住
    channel.start_consuming()
    consumer

    生产者发送一条信息,所有的客户端都收到

    exchange其实就是一个转发器,在定义的时候是有类型的,以决定到底是哪些队列符合条件,可以接收信息

    fanout:所有bind到此exchange的队列都可以接收信息(纯广播,只要绑定该exchange就能接收)

    direct:通过routingkey和exchange决定的那个唯一的队列可以接收消息

    topic:所有符合routingkey(此时可以是一个表达式)的routingkey所bind的队列可以接收消息

    headers:通过headers来决定把消息发给哪些队列

    RabbitMQ fanout 广播模式

    发广播的时候,就好似收音机的情况,如果发消息的时候,消费者不在线,则不会继续收到启动前的消息,这个就是订阅

    import sys,pika
    # 相当于建立一个socket
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 声明一个管道,消息将从管道发送,管道相当于一条路
    channel = connection.channel()
    
    # 声明
    channel.exchange_declare(exchange='logs',
                             exchange_type= 'fanout'
                             )
    message = ''.join(sys.argv[1:]) or 'info:hello World!'
    
    # 发布
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message
                          )
    
    print('[x]sent %r'%message)
    publish
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    # exclusive:唯一的。不指定队列名,rabbit会随机分配一个名字,exclusive=True会在使用此队列的消费者断开后,自动将队列删除
    result = channel.queue_declare(exclusive=True)
    
    queue_name = result.method.queue
    print('随机队列名:',queue_name)
    # 绑定到指定的转发器,这里是logs,因为所有的消费者都是从队列取消息,不是exchange直接连接,所以需要队列queue
    channel.queue_bind(exchange='logs',
                       queue=queue_name
                       )
    
    print('[*]waiting for logs.To exit press ctrl + C')
    
    def callback(ch,method,properties,body):
        '''
            回调函数
        :param ch: 管道的内存对象地址
        :param method: 包含了发给谁的队列,一般不用
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties)
        #sleep 30秒试试
    # 假设消费者处理事件处理一半,断电了,这种情况,RabbitMQ是如何处理的?
    
        print("[x] Receive %r "% body)
        # 设置跟服务器确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 这里只是声明而已,一个语法
    channel.basic_consume(#消费消息
                        callback,#如果收到消息,就调用callback函数来处理消息
                          queue=queue_name,#从哪个队列收消息
                          # no_ack=True #no ackownledgement 没有确认,有这个意思是不需要对方确认,一般不写这个;
    )
    
    channel.start_consuming()
    consumer

    RabbitMQ direct 广播模式

    import pika,sys
    
    # 相当于建立一个socket
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 声明一个管道,消息从管道发送
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                            exchange_type='direct'
    
                             )
    # 重要程度,相当于级别,
    serverity = sys.argv[1] if len(sys.argv) >1  else 'info'
    
    message = ''.join(sys.argv[2:]) or 'hello world!'
    print(serverity,message)
    channel.basic_publish(
        exchange='direct_logs',
        routing_key=serverity,
        body=message
    )
    
    print('[x]Sent %r:%r'%(serverity,message))
    connection.close()
    publisher
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct'
                             )
    
    result = channel.queue_declare(exclusive=True)
    
    queue_name = result.method.queue
    
    servities = sys.argv[1:]
    
    print('servities:  ',servities)
    if not servities:
        sys.stderr.write('错误信息:%s'%sys.argv[0])
        sys.exit(1)
    
    for servity in servities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=servity
                           )
    
    print('等待logs,退出请按ctrl + C')
    
    def callback(ch,method,properties,body):
        print('%r:%r'%(method.routing_key,body))
        print('ch,method,properties,body:',ch,method,properties,body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    channel.start_consuming()
    consumer

    带参数的运行如图

     topic 细致的消息过滤广播模式

    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic'
                             )
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    
    print('[x] sent %r:%r'%(routing_key,message))
    connection.close()
    topic_publisher
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channle = connection.channel()
    
    channle.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channle.queue_declare(exclusive=True)
    
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1]
    
    if not binding_keys:
        sys.stderr.write('usage: %s [binding_key]...
    '% sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channle.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print('等待logs,请ctrl + C退出')
    def callback(ch,method,properties,body):
        print('%r:%r' % (method.routing_key, body))
        # print('ch,method,properties,body:',ch,method,properties,body)
    
    channle.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    print()
    
    channle.start_consuming()
    topic_consumer

    RabbitMQ rpc的实现(remote procedure call)

    之前的都是单向的,就是服务器向客户端发送信息,但是实际应该双方交互

    import pika
    import uuid,time
    
    class FibonaccRpcClient(object):
        def __init__(self):
            # 相当于建立一个socket
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            # 声明一个管道,消息将从管道发送,管道相当于一条路
            self.channel = self.connection.channel()
            # 生成随机队列
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
            # 这里只是声明而已,一个语法
            self.channel.basic_consume(self.on_response,#只要一收到消息就调用
                                       no_ack=True,#不需要对方确认
                                       queue=self.callback_queue#消息队列名称
                                       )
    
    
        # 收到消息后,将self.response修改为队列的返回,body
        def on_response(self, ch, method, props, body):
            # 如果自身产生的随机数等于服务器端返回的数值情况,收到的结果就是这边请求的结果
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            # 返回给服务器端的信息
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties = pika.BasicProperties(
                                           reply_to=self.callback_queue,#队列名称,可以放对方知道发送的信息是来自哪个队列,从而回复的时候使用对应的队列
                                           correlation_id=self.corr_id),
                                       body=str(n)
                                       )
            # 不使用start,因为start是阻塞的
            while self.response is None:
                # 非阻塞版的start
                #  有消息就收消息,没有就继续走下去
                self.connection.process_data_events()
                print('no msg...')
                # time.sleep(0.5)
            return  int(self.response)
    fibonacci_rpc = FibonaccRpcClient()
    print('[x] requesting fib(30)')
    response = fibonacci_rpc.call(30)
    print('[.] got %r'%response)
    rpc_client
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n ==1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch,method,props,body):
        '''
    
        :param ch:
        :param method:
        :param props: 客户端发过来的信息,从这里可以收到客户端发过来的队列名称
        :param body:
        :return:
        '''
        n = int(body)
        print('[.] fib(%s)'%n)
    
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,#h获取客户端发送过来的队列名称
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=str(response))
        # 确保信息被消费
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 客户端最多滞留一条信息在队列
    channel.basic_qos(prefetch_count=1)
    # 这里只是声明而已,一个语法
    channel.basic_consume(on_request,queue='rpc_queue')
    
    print('[x] Awaiting RPC requests')
    channel.start_consuming()
    rpc_server

    Redis

    Redis安装和基本使用

    安装可以参考这个

    linux 下安装:依次输入

    wget http://download.redis.io/releases/redis-4.0.6.tar.gz

    tar -zxvf redis-4.0.6.tar.gz

    yum install gcc (遇到选择,输入y即可)

    cd redis-4.0.6

    make MALLOC=libc

    cd src && make install

    Redis string操作

    Memcached & Redis使用,参考:点击这里

    不同程序的数据交互,通过缓存来弄,存在的软件有mongodb,redis(内存以及硬盘),memcache(只在内存)

    Redis是单线程的
    Redis是一个key-values存储系统,它支持存储的value类型相对更多,包括string,list,set,zset(sorted set--有序集合)和hash(哈希类型)。这些类型都支持push/pop,add/remove以及取交集并集和差集以及更丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是Redis会周期性的把更新的数据写入硬盘或者把修改操作写入追加的记录文件,并且再次基础上实现master-slave(主从)同步

     windows下使用:

    进入目录:D: oolspythonRedis-x64-3.2.100:输入redis-server.exe redis.windows.conf 

    另一个cmd窗口:redis-cli.exe -h 127.0.0.1 -p 6379

    使用
    set key value

    set name alex
    set age 22

    keys * #查看

    get name #获取
    get age #获取

    set name jack ex 2 #设置namejack存在2秒钟

     

    sudo pip install redis
    or
    sudo easy_install redis
    or
    https://github.com/WoLpH/redis-py

    使用

    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('foo','Bar')
    print(r.get('foo'))

    连接池

    因为上面每次连接都要建立连接,这样太浪费资源,所以就需要用到连接池,其他的程序直接从连接池获取数据即可
    redis-py 使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池

    import redis
    # 建立一个连接池
    pool = redis.ConnectionPool(host='localhost',port=6379)
    
    r = redis.Redis(connection_pool=pool)
    
    r.set('foo2',"Bar2")
    
    print(r.get('foo2'))


    操作

    string操作
    hash操作
    list操作
    set操作
    sort set操作
    管道
    发布订阅

    命令不会的时候可以:help 命令


    set(name,value,ex=None,px=None,nx=False,xx=False)

    在Redis中设置值,默认,不存在则创建,存在则修改
    ex:过期时间(秒)
    px:过期时间(毫秒)
    nx:如果设置为True,则只有name不存在时,当前set操作才执行
    xx:如果设置为True,则只有name存在时,当前set操作才执行

    因为name不存在,所以用nx的时候,第一次能执行,第二次无法执行

     

    因为name3不存在,使用xx的时候,命令无法执行

    setnx(name,value)(相当于上方的nx=True的情况,所以一般不用)
    设置值,只有name不存在时,执行设置操作(添加)

    setex(name,time,value)

    time:过期时间(秒)
    
    psetex(name,time_ms,value)
    time_ms:过期时间(毫秒)
    
    mset(*args,**kwargs)
    批量设置值
    如:mset(k1='v1',k2='v2') 或者 mset({'k1':'v1','k2':'v2'})

    因为这里是毫秒,所以设置成功后,存在的时间很短,就没有了 

     mset:批量设置

    get(name)#获取值

    mget(keys,*args)

    批量获取,如:mget('y1r','wupeiqi') 或者 r.mget(['y1r','wupeiqi'])

     

    getset(name,value)
    设置新值并获取原来的值:先要有n1,获取n1的值后,再把Jack赋值到n1 

    getrange(key,start,end)

    获取子序列(根据字节获取,非字符)
    name:Redis的name
    start:起始位置(字节)
    end:结束位置(字节)
    例子: set n1 Jack getrange(n1,0,2)

    n1存在的前提下,相当于切片

    setrange(name,offset,value)

    例子:set n1 Jack setrange n1 0 | 结果:|ack
    
    setrange n1 1 AL 结果:|ALk

    setbit(name.offset,value)  一般不用,应用场景:网易等有哪些用户在线

    对name对应值的二进制表示的位进行操作
    #value 值只能是1或0
    例子:继续上面的操作,setbit n1 1 0 get n1#结果:<ALk

    具体例子:假设n3为alex,a对应的assic是97,二进制位:01100001,将左数第六位转换为1,则变成了99,对应吃,所以n3变成了clex

     应用场景:网易等有哪些用户在线,设置一个新的值n5,用户id1000在线,则变成1,用户id55,6000同理,使用bitcount计算对应数量,然后循环就能计算出哪个用户在线

    bitcount(key,start - None),end = None)

    #获取name对应的值的二进制表示中1的个数
    参数:
        #key,Redis的name
        #start,位起始位置
        #end,位结束位置

     strlen(name)

    #返回name对应值的字节长度(一个汉字三个字节)

    incr(self,name,amount=1)

    #自增name对应的值,当name不存在时,则创建name=amount,否则,则自增
    
    #参数:
        #name,redis的name
        #amount,自增数(必须是整数)

    incrbyfloat(self,name,amount=1.0)

    #自增name对应的值,当name不存在时候,则创建name=amount,否则则自增
    #参数:
        #name ,redis的name
        @amout,自增数

     append(key,value)

    #在redis name对应的值后面追加内容
    #参数
           #key,redis的name
            #value,要追加的字符串

    Redis hash操作

     redis存储在内存的格式如下

    hset(name,key,value)

    #name对应的hash中设置一个键值对(不存在,则创建;否则修改)
    
    #参数:
        #name:redis的name
        #key,name对应的hash中的key
        #value,name对应你的hash中的value

    hmset(name,mapping)

    #在name对应的hash中批量设置键值对
    #参数:
        #name,redis的name
        #mapping,字典

    hgetall(name)

    #获取name对应hash的所有键值

    hlen(name)

    #获取name对应的hash中键值的个数

    hmget()


    hkeys(name)

    #获取name对应的hash中所有的key的值

    hvals(name)

    #获取name对应的hash中所有的value的值

    hexists(name.key)

    #检查name对应的hash是否存在当前传入的key,存在则为1,不存在则为0

    hdel(name,*keys)

    #将name对应的hash中指定key的键值对删除

    hincrby(name.key.amount =1)

    #自增name对应的hash中的指定key的值,不存在则创建key=amount
    #参数:
        #name,redis中对应的name
        #key,hash对应的key
        #amount,自增数

    hscan(name,cursor=0,match=None,count=None)

    #增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分配的获取数据,并非一次性将数据全部获取完,
    
    #参数:
        #name,redis的name
        #corsor,游标(基于游标分配获取数据)
        #match,匹配指定key,默认None,表示所有的key
        #count,每次分配最少获取个数,默认None表示采用redis的默认分片个数

    从0游标开始,获取所有n开头的key

    Redis集合set何有序集合的操作

    list 操作,redis中的list在内存中按照一个name对应一个list来存储,如图

    lpush(name.values) :先入后出

    #在name对应的list中添加元素,每个新的元素都添加到列表的最左边

     lrange(names,  )

    rpush(name,values)

    #在name对应的list中添加元素,每个新的元素都添加到列表的最右边

    lpushx(name,value)

    #在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边
    
    #rpushx(name,values)表示从右向左操作

    llen( name)

    #name对应的list元素的个数

    linsert(name.where,refvalue,value)

    #在name对应的列表的某一个值前或后插入一个新值
    #参数:
        #name,redis的name
        #where,BEFORE 或 AFTER
        #refvalue,标杆值,即:在他前后插入数据
        #value,要插入的数据

    r.lset(name,index.value)

    #对name对应的list中的某一个索引位置重新复制
    #参数
        #name,redis的name
        #index,list的索引位置
        #value,要设置的值

     

    rlrem(name,values,num)

    #在name对应的list中,删除指定的值
    #参数
        #name,redis的name
        #value,要删除的值
        #num,num=0,删除列表中所有的指定值
            #num = 2 从签到后,删除2个
            #num = -2,从后向前,删除2个

    lpop(name)

    #在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
    
    #rpop(name) 表示从右向左操作

     lindex(name,index)

    #在name对应的列表中根据索引获取列表元素

    lrange(name.start,end)

    #在name对应的列表分片获取数据
    #参数:
        #name,redis的name
        #start,索引的起始位置
        #end,所以的结束位置

    ltrim(name,start,end)

    #在name对应的列表中移除没有在start-end索引之间的值
    #参数:
        #name,redis的name
        #start,索引的起始位置
        #end,索引的结束位置

    保留第二到第三的元素

    rpoplpush(src,dst)

    #从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
    #参数:
        #src 要取数据的列表的name
        #dis,要添加数据的列表的name

    blpop(keys,timeout)

    #将多个列表漂亮,按照从左到右去pop对应列表的元素
    #参数:
        #keys, redis的name的集合
        #timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒) 0表示永远阻塞
    
        #r.brpop(keys,timeout) 从右向左获取数据

     假设names2列表里面有aaa,,bbb,ccc三个元素,设设置删除元素,否则阻塞40秒,三次后,将数据读取完,就会阻塞40秒,而阻塞的过程中,有其他地方存入数据,则立即将这个数据删除

    brpoplpush(src,dst,timeout = 0)

    #从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
    #参数
        #src,取出并要移除元素的列表对应的name
        #dist,要插入元素的列表对应的name
        #timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间,0表示永远阻塞

    set操作

    sadd(name,values)

    #name对应的集合中添加元素

     因为集合是不重复的,所以即使增加相当的内容,下面也只保留3个,

    smembers(name)

    #获取name对应的集合的所有成员

    scard(name)

    #获取name对应的集合中元素个数

    sdiff(keys,*args) 

    #在第一个name对应的集合中且不在其他name对应的集合的元素集合

    sdiffstore(dest,keys,*args)

    #获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到的dest对应的集合中

     sinter(keys,*args)

    #获取都一个name对应集合的交集

     sinterstore(dest,keys,*args)

    #获取多一个name对应集合的交集,再将其加入到dest对应的集合中

    sismember(name,value)

    #检查value是否是name对应的集合的成员,存在显示1.不存在显示0

     

    smove(src.dst,value)

    #将某个成员从一个集合中移动到另一个集合

    spop(name)

    #从集合的右侧(尾部)移除一个成员,并将其返回

    srandmember(name,numbers)

    #从name对应的集合中随机获取numbers个元素

    srem(name,values)

    #在name对应的集合中删除某些值

    sunion(keys,*args)

    #获取一个name对应你的集合的并集

     sunionstore(dest,keys,*args)

    #获取多一个name对应的集合的并集,并将结果保存到的dest对应的集合

    sscan(name,cursor=0,match=None,count=None)

    sscan_iter(name,match=None,count=None)

    #同字符串的操作,用于增量迭代分配获取远古三,避免内存消耗太大

    有序集合

    在集合的基础上,为每个元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

    zadd(name,*args,**kargs)

    #在name对应的有序集合中添加元素
    #如:
        zadd('zz','n1','n2',2)
        或者
        zadd('zz',n1=11,n2=22)

    因为是有序的,所以讲alex的位置从10修改到了7,排序就不一样了

    z.range(name,start,end,desc = False,withscores=False,score_cast_func=float)

    #按照索引泛微获取name对应的有序集合的元素
    #参数
        #name,redis的name
        #start,有序集合索引起始位置(非分数)
        #end,有序集合索引结束位置(非分数)
        #desc,排序规则,默认按照分数从小到大排序
        #withscores,是否获取元素的分数,默认只获取元素的值
        #score_cast_func,对分数进行数据转换的函数

    更多:

      zrevrange(name,start,end,withscores=False,score_cast_func=float)

    #从大到小排序
    另外:
    #按照分数泛微获取name对应的有序集合的元素
    zrangebyscore(name,min,max.start=None,num=None,withscores=False,score_cast_func=float)
    zreverangebyscore(name.max.min,start=None.num=None.withscores=False,score_cast_func=float)

    zcard(name)

    #获取name对应的有序集合元素的数量

    zcount(name.min,max)

    #获取name对应的有序集合中分数,在[min,max]之间的个数

    zincrby(name,value,amount)

    #自增name对应的有序集合的name对应的分数

    zrank(name,value)

    #后去某个值在name对应的有序集合中的排序(从0开始)
    #更多
        #zrevrank(name,value)从大到小排序

    zrangebylex(name.min,max.start=None,num=None)(课程说忘记吧)

    zrem(name,values)

    #删除name对应的有序集合中值是values的成员

    zremrangebyrank(name,min,max)

    #根据排序范围删除

    zremrangebyscore()

    #根据分数范围删除

    zremrangebylex(name,min,max)

    #根据值返回删除

    zscore(name,value)

    #获取name对应有序集合中value对应的分数

    zinterstore(dest,keys,aggregate=None)

    #获取两个有序集合的交集,如果遇到相同的值,则按照aggregate进行操作
    #aggregate的值为:sum min max

     

    zinterstore z3 2 z1 z2 :意思是z3是新集合名称,2是代表两个集合做交集,做交集的集合是z1,z2

    zunionstore(dest,keys,aggregate=None)

    #获取两个有序集合的并集,如果遇到相同值,则按照aggregate进行操作
    #aggregate的值为:sum min max

    zscan(name.cursor=0,match=None,count=None.score_cast_func=float)

    zscan_iter(name,match,count=None,score_cast_func=float)

    #同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

     其他操作

    delete(*names)

    #根据删除redis中的任意数据类型

    exists(name)

    #检测redis的name是否存在

    keys(pattern='*')

    #根据模型获取redis的name
        #keys * 匹配数据库中所有的key
        #keys h?llo 匹配hello,hallo hxllo等
        #keys h*llo 匹配hllo何heeeello等
        #kesy h[ae]llo 匹配hello何hallo,但是不匹配hillo

    expire(name.time)

    #为某个redis的某个name设置超时时间

    rename(src,dst)

    #对redis的name重命名为

    move(name,db) redis默认只有16个表,0 - 15,使用select 0 等切换到不同的表

    #将redis的某个值移动到指定的db下

    randomkey()

    #随机获取一个redis的name(不删除)

    type(name)

    #获取name对应的类型

    4.管道

    redis-py默认在执行每次请求都会创建(连接池申请链接)和断开(归还连接池)一次连接操作,如果想要再一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline是原子性操作

    例子:十秒等待期中,不会生成name,必须要两者都执行后,才会在队列里面查到

    import redis,time
    # redis的select 5里面
    pool = redis.ConnectionPool(host = 'localhost',port = 6379,db = 5)
    
    r = redis.Redis(connection_pool=pool)
    
    pipe = r.pipeline(transaction=True)
    
    pipe.set('name','alex')
    time.sleep(10)
    pipe.set('role','sb')
    pipe.execute()
    View Code

    5.发布订阅

     例子:

    import redis
    
    class RedisHelper:
        def __init__(self):
            self.__conn = redis.Redis(host='localhost')
            self.chan_sub = 'fm104.5'
            self.chan_pub = 'fm104.5'
    
        def public(self,msg):
            # 往频道fm104.5发送信息
            self.__conn.publish(self.chan_pub,msg)
            return  True
    
        def subscribe(self):
            # 相当于打开收音机
            pub = self.__conn.pubsub()
            # 调频道到fm104.5
            pub.subscribe(self.chan_sub)
            # 准备接收
            pub.parse_response()
            return pub
    redishelper
    from redishelper import RedisHelper
    # 实例化
    obj = RedisHelper()
    redis_sub = obj.subscribe()
    
    while True:
        msg = redis_sub.parse_response()
        print(msg)
    redis_sub订阅者

    发布者

    redis 命令参考:这里

  • 相关阅读:
    UOS ROOT如何SSH登陆
    UOS打印日志提示Can’t create temporary file,无法打印如何处理
    UOS简易OEM ISO镜像的步骤(UOS如何自行定制镜像文件)
    UOS火狐浏览器如何下载并安装Adobe Flash Player插件【AMD】
    UOS如何录制屏幕视频
    UOS怎么进入到单用户模式
    UOS免密访问windows共享文件夹
    UOS命令行服务器离线授权码激活步骤
    UOS怎么安装搜狗拼音输入法【x86】
    UOS如何安装RTX客户端-更新版(X86)
  • 原文地址:https://www.cnblogs.com/cheng662540/p/8697565.html
Copyright © 2011-2022 走看看