zoukankan      html  css  js  c++  java
  • redis 实战

    redis:

    redis 是基于内存的k-v 数据库,类似的有memcached,但是memcached 只支持 string,

    mongodb 是基于硬盘的文档型数据库

    Chapter01: redis 的五种数据类型 以及基本操作:

    1,字符串 string 

    常用操作:

    get

    set 

    del 

    2,链表 list  

    常用操作:

    lpush,rpush

    lpop,rpop

    lindex ,lrange

    3,无序集合 set

    常用操作:

    sadd

    srem 

    sismember

    4,散列(字典)hash

    常用操作:

    hset

    hgetall

    hget

    hdel 

    5,有序集合 zset 

    常用操作:

    zadd

    zrange

    zrangebyscore

    zrem

    #集合是  add ,rem 

    chapter1: 文章进行新增,投票,分组(Python 版):

    参考代码:https://github.com/josiahcarlson/redis-in-action/blob/a5d1aa5c7506ffc7c3557b1c1543ed3029b84566/python/ch01_listing_source.py#L241

    import time
    import pprint
    import unittest
    import redis
    
    ONE_WEEK_IN_SECONDS = 7 * 86400
    VOTE_SCORE = 432
    
    # 发布一篇文章
    def post_article(conn:redis.client.Redis,user,title,link):
        # generate an article_id
        article_id = str(conn.incr("article:")) # eg 1
    
        voted = 'voted:'+article_id # eg voted:1
    
        conn.sadd(voted,user) # 把发布者 添加到投票的用户中
        conn.expire(voted,ONE_WEEK_IN_SECONDS) # 一周后 自动删除voted 集合
    
        now = time.time()
        article = 'article:'+article_id  # eg article:1
    
        # 初始化 文章内容
        conn.hmset(article,{
            'title':title,
            'link':link,
            'poster':user, # 作者
            'time':now, # float
            'votes':1, # 投票数 作者默认投一票
        })
    
        # 初始化 评分 和 发布时间
        conn.zadd("score:",{article:now+VOTE_SCORE})
        conn.zadd("time:",{article: now})
    
        return article_id
    
    # 给某篇文章 投票
    def vote_article(conn:redis.client.Redis,user,article):
        # 先看下是否 过期
        cutoff = time.time() - ONE_WEEK_IN_SECONDS
        if cutoff > conn.zscore("time:",article): # 获取zset 中time: 元素对应的值
            # 过期
            return
    
        article_id = article.partition(":")[-1]
        # 将user 增加到 voted: 表中,以及增加 zset 中的score 分  和 增加hash 中文章信息的 votes
        if conn.sadd("voted:"+article_id,user):
            conn.zincrby("score:",VOTE_SCORE,article)
            conn.hincrby(article,"votes",1)
    
    ARTICLES_PER_PAGE = 25 # 每页25 篇文章
    
    # 获取每页的 文章的具体数据  # 按照 score 排序
    def get_articles(conn,page,order='score:'):
        start = (page - 1) *ARTICLES_PER_PAGE
        end = start + ARTICLES_PER_PAGE - 1
    
        ret = conn.zrevrange(order,start,end) # 返回的是个列表 无score
        articles = []
        for item in ret:
            article_data = conn.hgetall(item)
            article_data["id"] = item
            articles.append(article_data)
        return articles
    
    # 将文章 增加到组或者 从组中删除      一篇文章可以属于多个组  群组的数据放在 集合中
    def add_remove_groups(conn,article_id,to_add,to_remove):
        article = "article:" + article_id
        for group in to_add:
            conn.sadd("group:"+group,article)
    
        for group in to_remove:
            conn.srem("group:"+group,article)
    
    # 从组中 得到每页的 文章信息   # 按照 score 排序
    def get_group_articles(conn,group,page,order='score:'):
        key = order + group  # key  存放zset 的新的数据
        if not conn.exists(key):
            # group set  +  order zset --->  zset
            conn.zinterstore(key,
                             ['group:'+group,order],
                             aggregate='max',
                             )
            conn.expire(key,60) # 60s  结合一次,减轻压力
    
        return get_articles(conn,page,key)
    
    
    class TestCh01(unittest.TestCase):
        def setUp(self):
            import redis
            self.conn = redis.Redis(db=15)
    
        # 测试 发布文章
        def test_post_article(self):
            conn = self.conn
            import pprint
            article_id = str(post_article(conn,"tom","A Book","http://127.0.0.1"))
            print("新建了并初始化了一篇文章 文章Id: ",article_id)
    
            # 获取文章的详情 它存储在redis 的 hash 中
            ret = conn.hgetall("article"+article_id) # Python中会自动转为 dict
            # print(ret,type(ret))
    
    
        def test_vote_article(self):
            conn = self.conn
            vote_article(conn,"alex","article:1")
            # 获取article:1 的votes 数
            votes = conn.hget("article:1","votes")
            # print(votes)
    
        def test_get_articles(self):
            conn = self.conn
            articles = get_articles(conn,1)
    
            pprint.pprint(articles)
    
        def test_add_remove_groups(self):
            conn = self.conn
            add_remove_groups(conn,"1",["编程语言","数据库"],[]) # 增加 article:1 到 连个组中
    
            ret = conn.smembers("group:编程语言")
            print(ret)
    
        def test_get_group_articles(self):
            conn = self.conn
            articles = get_group_articles(conn,"编程语言",1)
            pprint.pprint(articles)
    
    if __name__ == '__main__':
        unittest.main()
    View Code
  • 相关阅读:
    对象和接口简单比较
    DevExpress报表开发基本流程
    有关ExecuteNonQuery返回值的分析
    2012年度计划
    小测试:有关++i&&i++,你是不是看晕了
    “PE文件格式”1.9版 完整译文
    .NET中的入口及幕后英雄:MSCorEE.dll(转)
    软件构建过程中的隐喻
    转:地图导出格式,教你如何选择
    推荐几个网站
  • 原文地址:https://www.cnblogs.com/zach0812/p/13036087.html
Copyright © 2011-2022 走看看