redis:
redis 是基于内存的k-v 数据库,类似的有memcached,但是memcached 只支持 string,
mongodb 是基于硬盘的文档型数据库
Chapter01: redis 的五种数据类型 以及基本操作:
1,字符串 string
常用操作:
get
set
del
2,链表 list
常用操作:
lpush,rpush
lpop,rpop
lindex ,lrange
3,无序集合 set
常用操作:
sadd
srem
sismember
4,散列(字典)hash
常用操作:
hset
hgetall
hget
hdel
5,有序集合 zset
常用操作:
zadd
zrange
zrangebyscore
zrem
#集合是 add ,rem
chapter1: 文章进行新增,投票,分组(Python 版):
import time import pprint import unittest import redis ONE_WEEK_IN_SECONDS = 7 * 86400 VOTE_SCORE = 432 # 发布一篇文章 def post_article(conn:redis.client.Redis,user,title,link): # generate an article_id article_id = str(conn.incr("article:")) # eg 1 voted = 'voted:'+article_id # eg voted:1 conn.sadd(voted,user) # 把发布者 添加到投票的用户中 conn.expire(voted,ONE_WEEK_IN_SECONDS) # 一周后 自动删除voted 集合 now = time.time() article = 'article:'+article_id # eg article:1 # 初始化 文章内容 conn.hmset(article,{ 'title':title, 'link':link, 'poster':user, # 作者 'time':now, # float 'votes':1, # 投票数 作者默认投一票 }) # 初始化 评分 和 发布时间 conn.zadd("score:",{article:now+VOTE_SCORE}) conn.zadd("time:",{article: now}) return article_id # 给某篇文章 投票 def vote_article(conn:redis.client.Redis,user,article): # 先看下是否 过期 cutoff = time.time() - ONE_WEEK_IN_SECONDS if cutoff > conn.zscore("time:",article): # 获取zset 中time: 元素对应的值 # 过期 return article_id = article.partition(":")[-1] # 将user 增加到 voted: 表中,以及增加 zset 中的score 分 和 增加hash 中文章信息的 votes if conn.sadd("voted:"+article_id,user): conn.zincrby("score:",VOTE_SCORE,article) conn.hincrby(article,"votes",1) ARTICLES_PER_PAGE = 25 # 每页25 篇文章 # 获取每页的 文章的具体数据 # 按照 score 排序 def get_articles(conn,page,order='score:'): start = (page - 1) *ARTICLES_PER_PAGE end = start + ARTICLES_PER_PAGE - 1 ret = conn.zrevrange(order,start,end) # 返回的是个列表 无score articles = [] for item in ret: article_data = conn.hgetall(item) article_data["id"] = item articles.append(article_data) return articles # 将文章 增加到组或者 从组中删除 一篇文章可以属于多个组 群组的数据放在 集合中 def add_remove_groups(conn,article_id,to_add,to_remove): article = "article:" + article_id for group in to_add: conn.sadd("group:"+group,article) for group in to_remove: conn.srem("group:"+group,article) # 从组中 得到每页的 文章信息 # 按照 score 排序 def get_group_articles(conn,group,page,order='score:'): key = order + group # key 存放zset 的新的数据 if not conn.exists(key): # group set + order zset ---> zset conn.zinterstore(key, ['group:'+group,order], aggregate='max', ) conn.expire(key,60) # 60s 结合一次,减轻压力 return get_articles(conn,page,key) class TestCh01(unittest.TestCase): def setUp(self): import redis self.conn = redis.Redis(db=15) # 测试 发布文章 def test_post_article(self): conn = self.conn import pprint article_id = str(post_article(conn,"tom","A Book","http://127.0.0.1")) print("新建了并初始化了一篇文章 文章Id: ",article_id) # 获取文章的详情 它存储在redis 的 hash 中 ret = conn.hgetall("article"+article_id) # Python中会自动转为 dict # print(ret,type(ret)) def test_vote_article(self): conn = self.conn vote_article(conn,"alex","article:1") # 获取article:1 的votes 数 votes = conn.hget("article:1","votes") # print(votes) def test_get_articles(self): conn = self.conn articles = get_articles(conn,1) pprint.pprint(articles) def test_add_remove_groups(self): conn = self.conn add_remove_groups(conn,"1",["编程语言","数据库"],[]) # 增加 article:1 到 连个组中 ret = conn.smembers("group:编程语言") print(ret) def test_get_group_articles(self): conn = self.conn articles = get_group_articles(conn,"编程语言",1) pprint.pprint(articles) if __name__ == '__main__': unittest.main()