Redis入门指南
人民邮电出版社
北京
图书在版编目(CIP)数据
Redis入门指南 / 李子骅 编著. --北京:人民邮电出版社,2013.4
ISBN 978-7-115-31294-5
Ⅰ. ①R… Ⅱ. ①李… Ⅲ. ①数据库—基本知识 Ⅳ.①TP311.13
中国版本图书馆CIP数据核字(2013)第050956号
内容提要
本书是一本Redis的入门指导书籍,以通俗易懂的方式介绍了Redis基础与实践方面的知识,包括历史与特性、在开发和生产环境中部署运行Redis、数据类型与命令、使用Redis实现队列、事务、复制、管道、持久化、优化Redis存储空间等内容,并采用任务驱动的方式介绍了PHP、Ruby、Python和Node.js这4种语言的Redis客户端库的使用方法。
本书的目标读者不仅包括Redis的新手,还包括那些己经掌握Redis使用方法的人。对于新手而言,本书的内容由浅入深且紧贴实践,旨在让读者真正能够即学即用;对于己经了解Redis的读者,通过本书的大量实例以及细节介绍,也能发现很多新的技巧。
Redis入门指南
◆编著 李子骅
责任编辑 杨海玲
◆人民邮电出版社出版发行 北京市崇文区夕照寺街14号
邮编 100061 电子邮件 315@ptpress.com.cn
网址 http://www.ptpress.com.cn
北京鑫正大印刷有限公司印刷
◆开本:800×1000 1/16
印张:12
字数:253千字 2013年5月第1版
印数:1-3500册 2013年5月北京第1次印刷
ISBN 978-7-115-31294-5
定价:39.00元
读者服务热线:(010)67132692 印装质量热线:(010)67129223
反盜版热线:(010)67171154
Redis如今已经成为Web开发社区中最火热的内存数据库之一,而它的诞生距现在不过才4年。随着Web 2.0的蓬勃发展,网站数据快速增长,对高性能读写的需求也越来越多,再加上半结构化的数据比重逐渐变大,人们对早已被铺天盖地地运用着的关系数据库能否适应现今的存储需求产生了疑问。而Redis的迅猛发展,为这个领域注入了全新的思维。
Redis 凭借其全面的功能得到越来越多的公司的青睐,从初创企业到新浪微博这样拥有着几百台Redis服务器的大公司,都能看到Redis的身影。Redis也是一个名副其实的多面手,无论是存储、队列还是缓存系统,都有它的用武之地。
本书将从Redis的历史讲起,结合基础与实践,带领读者一步步进入Redis的世界。
目标读者
本书假定读者是Redis的新手,甚至可能连Redis是什么都没听说过。本书将会详细地介绍Redis是什么以及为什么要使用Redis,旨在能够让读者从零开始逐步晋升为一个优秀的Redis开发者。
本书还包含了很多Redis实践方面的知识,对于有经验的Redis开发者,大可以直接跳过已经掌握的内容,只阅读感兴趣的部分。每章的引言都简要介绍了本章要讲解的内容,供读者参考。
本书并不需要读者有任何Redis的背景知识,不过如果读者有Web后端开发经验或Linux系统使用经验,阅读本书将会更加得心应手。
组织结构
第1章介绍了Redis的历史与特性,主要回答两个初学者最关心的问题,即Redis是什么和为什么要使用Redis。
第2章讲解了如何安装和运行Redis。如果你身旁的计算机没有运行Redis,那么一定不要错过这一章,因为本书后面的部分都需要读者最好能一边阅读一边实践,以提高学习效率。本章中还会介绍Redis命令行客户端的使用方法等基础知识,这些都是实践前需要掌握的知识。
第3章介绍了Redis的数据类型。本章讲解的不仅是每个数据类型的介绍和命令的格式,还会着重讲解每个数据类型分别在实践中如何使用。整个第3章会带领读者从零开始,一步步地使用 Redis构建一个博客系统,旨在帮助读者在学习完本章的内容之后可以直接在自己的项目中上手实践Redis。
第4章引入了一些Redis的进阶知识,比如事务和消息系统等。同样本章还会继续以博客系统为例子,以实践驱动学习。
第5章介绍了如何在各个编程语言中使用 Redis,这些语言包括 PHP、Ruby、Python 和Node.js。其中讲解每种语言时最后都会以一个有趣的例子作为演示,即使你不了解某些语言,阅读这些例子也能让你收获颇丰。
第6章展示了Redis脚本的强大功能。本章会向读者讲解如何借助脚本来扩展Redis,并且会对脚本一些需要注意的地方(如沙盒、随机结果等)进行着重介绍。
第7章介绍了运维方面的知识,包括持久化、复制等,并向读者推荐了几个第三方的Redis管理工具。
附录A收录了Redis命令的不同属性,以及属性的特征。
附录B收录了Redis部分配置参数的章节索引。
排版约定
本书排版使用字体遵从以下约定。
●等宽字:表示在命令行中输入的命令以及返回结果、程序代码、Redis的命令(包括命令语句和命令定义)。
●等宽斜体字(或夹在其中的中文楷体字):表示命令或程序代码中由读者自行替换的参数或变量。
●等宽粗体字:表示命令行中用户的输入内容、伪代码中的Redis命令。
●命令行的输入或输出以如下格式显示:
redis-cli PING
PONG
●程序代码以如下格式显示:
var redis=require("redis");
var client=redis.createClient();
//将两个对象JSON 序列化后存入数据库中
client.mset(
'user:1',JSON.stringify(bob),
'user:2',JSON.stringify(jeff)
);
代码约定
本书的部分章节采用了伪代码来讲解,这种伪代码类似Ruby和PHP,如:
def hsetnx(key,
field,
value)
isExists=HEXISTS
key,
field
if isExists is 0
HSET key,
field,
value
return 1
else
return 0
其中变量使用符号标识,Redis命令使用的粗体表示并省略了括号以便于阅读。在命令调用和print等语句中没有
符号的字符串会被当做字符串字面值。
附加文件
本书第5章中每一节都包含了一个完整的程序,通常来讲读者最好自己输入这些代码来加深理解,当然如果要先看到程序的运行结果再开始学习也不失为一个好办法。
这些程序代码都存放在GitHub上(https://github.com/luin/redis-book-assets),可以在GitHub上查看与下载。
致谢
在本书写作的过程中,得到了很多朋友的帮助。请允许我在这里占用少许篇幅,向他们致以诚挚的谢意。
感谢人民邮电出版社的杨海玲老师对本书的支持,没有她的悉心指导,本书就无法顺利完成。
感谢刘亚晨、李欣越、寇祖阳和余尧,他们承担了许多额外的工作,使得我可以全身心地投入到写作中。
感谢所有浏览本书并提供意见和建议的人们:张沈鹏、陈硕实、刘其帅、扈煊、李其超、朱沖宇、王诗吟、黄山月、刘昕、韩重远、李申申、杨海朝、田琪等。感谢你们的支持。
另外还要感谢“宋老师”,是的,就是书中的主人公之一。几年前,我刚创业时,办公场所是和某个教育机构合租的,宋老师是该机构的一名英语老师,同时他也是国内一个知名的嘻哈乐团成员之一。他平日风趣的谈吐带给了我们很多欢乐,伴随我们走过了艰苦的创业初期,而我接触Redis,也正是从这段时间开始的。
最后,感谢我的父母和女朋友马梦妍,你们是我生命中最重要的人,感谢你们的理解和支持。
没过几天,小白就完成了博客的开发并将其部署上线。之后的一段时间,小白又使用Redis开发了几个程序,用得还算顺手,便没有继续向宋老师请教Redis的更多知识。直到一个月后的一天,宋老师偶然访问了小白的博客……
本章将会带领读者继续探索Redis,了解Redis的事务、排序与管道等功能,并且还会详细地介绍如何优化Redis的存储空间。
傍晚时候,忙完了一天的教学工作,宋老师坐在办公室的电脑前开始为明天的课程做准备。尽管有着近5年的教学经验,可是宋老师依然习惯在备课时写一份简单的教案。正在网上查找资料时,在浏览器的历史记录里他突然看到了小白的博客。心想:不知道他的博客怎么样了?
于是宋老师点进了小白的博客,页面刚载入完他就被博客最下面的一行大得夸张的文字吸引了:“Powered by Redis”。宋老师笑了笑,接着就看到了小白博客中最新的一篇文章:
标题:使用Redis来存储微博中的用户关系
正文:在微博中,用户之间是“关注”和“被关注”的关系。如果要使用Redis存储这样的关系可以使用集合类型。思路是对每个用户使用两个集合类型键,分别名为user:用户ID:followers和user:用户ID:following,用来存储关注该用户的用户集合和该用户关注的用户集合。
然后使用一个函数来实现关注操作,伪代码如下:
def follow(currentUser,
targetUser)
SADD user:currentUser:following,
targetUser
SADD user:targetUser:followers,
currentUser
如ID为1的用户A想关注ID为2的用户B,只需要执行follow(1, 2)即可。
然而在实现该功能的时候我发现了一个问题:完成关注操作需要依次执行两条Redis命令,如果在第一条命令执行完后因为某种原因导致第二条命令没有执行,就会出现一个奇怪的现象:A查看自己关注的用户列表时会发现其中有B,而B查看关注自己的用户列表时却没有A,换句话说就是,A虽然关注了B,却不是B的“粉丝”。真糟糕A和B都会对这个网站失望的!但愿不会出现这种情况。
宋老师看到此处,笑得合不拢嘴,把备课的事拋到了脑后。心想:“看来有必要给小白传授一些进阶的知识。”他给小白写了封电子邮件:
其实可以使用Redis的事务来解决这一问题。
Redis中的事务(transaction)是一组命令的集合。事务同命令一样都是Redis的最小执行单位,一个事务中的命令要么都执行,要么都不执行。事务的应用非常普遍,如银行转账过程中A给B汇款,首先系统从A的账户中将钱划走,然后向B的账户增加相应的金额。这两个步骤必须属于同一个事务,要么全执行,要么全不执行。否则只执行第一步,钱就凭空消失了,这显然让人无法接受。
事务的原理是先将属于一个事务的命令发送给Redis,然后再让Redis依次执行这些命令。例如:
redis>MULTI
OK
redis>SADD "user:1:following" 2
QUEUED
redis>SADD "user:2:followers" 1
QUEUED
redis>EXEC
1) (integer) 1
2) (integer) 1
上面的代码演示了事务的使用方式。首先使用MULTI命令告诉Redis:“下面我发给你的命令属于同一个事务,你先不要执行,而是把它们暂时存起来。”Redis回答:“OK。”
而后我们发送了两个SADD命令来实现关注和被关注操作,可以看到Redis遵守了承诺,没有执行这些命令,而是返回QUEUED表示这两条命令已经进入等待执行的事务队列中了。
当把所有要在同一个事务中执行的命令都发给Redis后,我们使用EXEC命令告诉Redis将等待执行的事务队列中的所有命令(即刚才所有返回QUEUED的命令)按照发送顺序依次执行。EXEC命令的返回值就是这些命令的返回值组成的列表,返回值顺序和命令的顺序相同。
Redis保证一个事务中的所有命令要么都执行,要么都不执行。如果在发送EXEC命令前客户端断线了,则Redis会清空事务队列,事务中的所有命令都不会执行。而一旦客户端发送了EXEC命令,所有的命令就都会被执行,即使此后客户端断线也没关系,因为Redis中已经记录了所有要执行的命令。
除此之外,Redis的事务还能保证一个事务内的命令依次执行而不被其他命令插入。试想客户端A需要执行几条命令,同时客户端B发送了一条命令,如果不使用事务,则客户端B的命令可能会插入到客户端A的几条命令中执行。如果不希望发生这种情况,也可以使用事务。
有些读者会有疑问,如果一个事务中的某个命令执行出错,Redis会怎样处理呢?要回答这个问题,首先需要知道什么原因会导致命令执行出错。
(1)语法错误。语法错误指命令不存在或者命令参数的个数不对。比如:
redis>MULTI
OK
redis>SET key value
QUEUED
redis>SET key
(error)ERR wrong number of arguments for 'set' command
redis> ERRORCOMMAND key
(error) ERR unknown command 'ERRORCOMMAND'
redis> EXEC
(error) EXECABORT Transaction discarded because of previous errors.
跟在MULTI命令后执行了3个命令:一个是正确的命令,成功地加入事务队列;其余两个命令都有语法错误。而只要有一个命令有语法错误,执行EXEC命令后Redis就会直接返回错误,连语法正确的命令也不会执行① 。
注释:①Redis 2.6.5之前的版本会忽略有语法错误的命令,然后执行事务中其他语法正确的命令。就此例而言,SET key value会被执行,EXEC命令会返回一个结果:1) OK。
(2)运行错误。运行错误指在命令执行时出现的错误,比如使用散列类型的命令操作集合类型的键,这种错误在实际执行之前Redis是无法发现的,所以在事务里这样的命令是会被Redis接受并执行的。如果事务里的一条命令出现了运行错误,事务里其他的命令依然会继续执行(包括出错命令之后的命令),示例如下:
redis>MULTI
OK
redis>SET key 1
QUEUED
redis>SADD key 2
QUEUED
redis>SET key 3
QUEUED
redis>EXEC
1) OK
2) (error) ERR Operation against a key holding the wrong kind of value
3) OK
redis>GET key
"3"
可见虽然SADD key 2出现了错误,但是SET key 3依然执行了。
Redis的事务没有关系数据库事务提供的回滚(rollback)① 功能。为此开发者必须在事务执行出错后自己收拾剩下的摊子(将数据库复原回事务执行前的状态等)。
注释:①事务回滚是指将一个事务已经完成的对数据库的修改操作撤销。
不过由于Redis不支持回滚功能,也使得Redis在事务上可以保持简洁和快速。另外回顾刚才提到的会导致事务执行失败的两种错误,其中语法错误完全可以在开发时找出并解决,另外如果能够很好地规划数据库(保证键名规范等)的使用,是不会出现如命令与数据类型不匹配这样的运行错误的。
我们已经知道在一个事务中只有当所有命令都依次执行完后才能得到每个结果的返回值,可是有些情况下需要先获得一条命令的返回值,然后再根据这个值执行下一条命令。例如,介绍INCR命令时曾经说过使用GET和SET命令自己实现incr函数会出现竞态条件,伪代码如下:
def incr(key)
value=GET
key
if not value
value=0
value=
value+1
SET key,
value
return value
肯定会有很多读者想到可以用事务来实现incr函数以防止竞态条件,可是因为事务中的每个命令的执行结果都是最后一起返回的,所以无法将前一条命令的结果作为下一条命令的参数,即在执行SET命令时无法获得GET命令的返回值,也就无法做到增1的功能了。
为了解决这个问题,我们需要换一种思路。即在GET获得键值后保证该键值不被其他客户端修改,直到函数执行完成后才允许其他客户端修改该键键值,这样也可以防止竞态条件。要实现这一思路需要请出事务家族的另一位成员:WATCH。WATCH命令可以监控一个或多个键,一旦其中有一个键被修改(或删除),之后的事务就不会执行。监控一直持续到EXEC命令(事务中的命令是在EXEC之后才执行的,所以在MULTI命令后可以修改WATCH监控的键值),如:
redis>SET key 1
OK
redis>WATCH key
OK
redis>SET key 2
OK
redis>MULTI
OK
redis>SET key 3
QUEUED
redis>EXEC
(nil)
redis>GET key
"2"
上例中在执行WATCH命令后、事务执行前修改了key的值(即SET key 2),所以最后事务中的命令SET key 3没有执行,EXEC命令返回空结果。
学会了WATCH命令就可以通过事务自己实现incr函数了,伪代码如下:
def incr(key)
WATCH key
value=GET
key
if not value
value=0
value=
value+1
MULTI
SET key,
value
result=EXEC
return result[0]
因为EXEC命令返回值是多行字符串类型,所以代码中使用result[0]来获得其中第一个结果。
提示 由于WATCH命令的作用只是当被监控的键值被修改后阻止之后一个事务的执行,而不能保证其他客户端不修改这一键值,所以我们需要在EXEC执行失败后重新执行整个函数。
执行EXEC命令后会取消对所有键的监控,如果不想执行事务中的命令也可以使用UNWATCH命令来取消监控。比如,我们要实现hsetxx函数,作用与HSETNX命令类似,只不过是仅当字段存在时才赋值。为了避免竞态条件我们使用事务来完成这一功能:
def hsetxx(key,
field,
value)
WATCH key
isFieldExists = HEXISTS
key,
field
if isFieldExists is 1
MULTI
HSET key,
field,
value
EXEC
else
UNWATCH
return isFieldExists
在代码中会判断要赋值的字段是否存在,如果字段不存在的话就不执行事务中的命令,但需要使用UNWATCH命令来保证下一个事务的执行不会受到影响。
转天早上宋老师就收到了小白的回信,内容基本上都是一些表示感谢的话。宋老师又看了一下小白发的那篇文章,发现他已经在文末补充了使用事务来解决竞态条件的方法。
宋老师单击了评论链接想发表评论,却看到博客出现了错误“请求超时”(Request timeout)。宋老师疑惑了一下,准备稍后再访问看看,就接着忙别的事情了。
没过一会儿,宋老师就收到了一封小白发来的邮件:
宋老师您好!我的博客最近经常无法访问,我看了日志后发现是因为某个搜索引擎爬虫访问得太频繁,加上本来我的服务器性能就不太好,很容易资源就被占满了。请问有没有方法可以限定每个IP地址每分钟最大的访问次数呢?
宋老师这才明白为什么刚才小白的博客请求超时了,于是放下了手头的事情开始继续给小白介绍Redis的更多功能……
在实际的开发中经常会遇到一些有时效的数据,比如限时优惠活动、缓存或验证码等,过了一定的时间就需要删除这些数据。在关系数据库中一般需要额外的一个字段记录到期时间,然后定期检测删除过期数据。而在Redis中可以使用EXPIRE命令设置一个键的生存时间,到时间后Redis会自动删除它。
EXPIRE命令的使用方法为EXPIRE key seconds,其中seconds参数表示键的生存时间,单位是秒。如要想让session:29e3d键在15分钟后被删除:
redis>SET session:29e3d uid1314
OK
redis>EXPIRE session:29e3d 900
(integer) 1
EXPIRE命令返回1表示设置成功,返回0则表示键不存在或设置失败。例如:
redis>DEL session:29e3d
(integer) 1
redis>EXPIRE session:29e3d 900
(integer) 0
如果想知道一个键还有多久的时间会被删除,可以使用TTL命令。返回值是键的剩余时间(单位是秒):
redis>SET foo bar
OK
redis>EXPIRE foo 20
(integer) 1
redis>TTL foo
(integer) 15
redis>TTL foo
(integer) 7
redis> TTL foo
(integer) -1
可见随着时间的不同,foo键的生存时间逐渐减少,20秒后foo键会被删除。当键不存在时TTL命令会返回1。另外同样会返回1的情况是没有为键设置生存时间(即永久存在,这是建立一个键后的默认情况):
redis>SET persistKey value
OK
redis>TTL persistKey
(integer) -1
如果想取消键的生存时间设置(即将键恢复成永久的),可以使用PERSIST命令。如果生存时间被成功清除则返回1;否则返回0(因为键不存在或键本来就是永久的):
redis>SET foo bar
OK
redis>EXPIRE foo 20
(integer) 1
redis>PERSIST foo
(integer) 1
redis>TTL foo
(integer) -1
除了PERSIST命令之外,使用SET或GETSET命令为键赋值也会同时清除键的生存时间,例如:
redis>EXPIRE foo 20
(integer) 1
redis>SET foo bar
OK
redis>TTL foo
(integer) -1
使用EXPIRE命令会重新设置键的生存时间,就像这样:
redis>SET foo bar
OK
redis>EXPIRE foo 20
(integer) 1
redis>TTL foo
(integer) 15
redis>EXPIRE foo 20
(integer) 1
redis>TTL foo
(integer) 17
其他只对键值进行操作的命令(如INCR、LPUSH、HSET、ZREM)均不会影响键的生存时间。
EXPIRE命令的seconds参数必须是整数,所以最小单位是1秒。如果想要更精确的控制键的生存时间应该使用PEXPIRE命令,PEXPIRE命令与EXPIRE的唯一区别是前者的时间单位是毫秒,即PEXPIRE key 1000与EXPIRE key 1等价。对应地可以用PTTL命令以毫秒为单位返回键的剩余时间。
提示 如果使用WATCH命令监测了一个拥有生存时间的键,该键时间到期自动删除并不会被WATCH命令认为该键被改变。
另外还有两个相对不太常用的命令:EXPIREAT 和PEXPIREAT。
EXPIREAT命令与EXPIRE命令的差别在于前者使用Unix时间作为第二个参数表示键的生存时间的截止时间。PEXPIREAT命令与EXPIREAT命令的区别是前者的时间单位是毫秒。例如:
redis>SET foo bar
OK
redis>EXPIREAT foo 1351858600
(integer) 1
redis>TTL foo
(integer) 142
redis>PEXPIREAT foo 1351858700000
(integer) 1
times=INCR rate.limiting:
IP
回到小白的问题,为了减轻服务器的压力,需要限制每个用户(以IP计)一段时间的最大访问量。与时间有关的操作很容易想到EXPIRE命令。
例如要限制每分钟每个用户最多只能访问100个页面,思路是对每个用户使用一个名为“rate.limiting:用户IP”的字符串类型键,每次用户访问则使用INCR命令递增该键的键值,如果递增后的值是1(第一次访问页面),则同时还要设置该键的生存时间为1分钟。这样每次用户访问页面时都读取该键的键值,如果超过了100就表明该用户的访问频率超过了限制,需要提示用户稍后访问。该键每分钟会自动被删除,所以下一分钟用户的访问次数又会重新计算,也就达到了限制访问频率的目的。
上述流程的伪代码如下:
isKeyExists=EXISTS rate.limiting:
IP
if isKeyExists is 1
if times>100
print访问频率超过了限制,请稍后再试。
exit
else
INCR rate.limiting:IP
EXPIRE keyName, 60
这段代码存在一个不太明显的问题:假如程序执行完倒数第二行后突然因为某种原因退出了,没能够为该键设置生存时间,那么该键会永久存在,导致使用对应的IP的用户在管理员手动删除该键前最多只能访问100次博客,这是一个很严重的问题。
为了保证建立键和为键设置生存时间一起执行,可以使用上节学习的事务功能,修改后的代码如下:
isKeyExists=EXISTS rate.limiting:
IP
if isKeyExists is 1
times=INCR rate.limiting:
IP
if times>100
print访问频率超过了限制,请稍后再试。
exit
else
MULTI
INCR rate.limiting:IP
EXPIRE keyName, 60
EXEC
事实上,4.2.2节中的代码仍然有个问题:如果一个用户在一分钟的第一秒访问了一次博客,在同一分钟的最后一秒访问了9次,又在下一分钟的第一秒访问了10次,这样的访问是可以通过现在的访问频率限制的,但实际上该用户在2秒钟内访问了19次博客,这与每个用户每分钟只能访问10次的限制差距较大。尽管这种情况比较极端,但是在一些场合中还是需要粒度更小的控制方案。如果要精确地保证每分钟最多访问10次,需要记录下用户每次访问的时间。因此对每个用户,我们使用一个列表类型的键来记录他最近10次访问博客的时间。一旦键中的元素超过10个,就判断时间最早的元素距现在的时间是否小于1分钟。如果是则表示用户最近1分钟的访问次数超过了10次;如果不是就将现在的时间加入到列表中,同时把最早的元素删除。
上述流程的伪代码如下:
listLength=LLEN rate.limiting:
IP
if listLength<10
LPUSH rate.limiting:IP, now()
else
time=LINDEX rate.limiting:
IP, -1
if now()-time<60
print访问频率超过了限制,请稍后再试。
else
LPUSH rate.limiting:IP, now()
LTRIM rate.limiting:IP, 0, 9
代码中now()的功能是获得当前的Unix时间。由于需要记录每次访问的时间,所以当要限制“A 时间最多访问B次”时,如果“B”的数值较大,此方法会占用较多的存储空间,实际使用时还需要开发者自己去权衡。除此之外该方法也会出现竞态条件,同样可以通过脚本功能避免,具体在第6章会介绍到。
为了提高网站的负载能力,常常需要将一些访问频率较高但是对CPU或IO资源消耗较大的操作的结果缓存起来,并希望让这些缓存过一段时间自动过期。比如教务网站要对全校所有学生的各个科目的成绩汇总排名,并在首页上显示前10名的学生姓名,由于计算过程较耗资源,所以可以将结果使用一个Redis的字符串键缓存起来。由于学生成绩总在不断地变化,需要每隔两个小时就重新计算一次排名,这可以通过给键设置生存时间的方式实现。每次用户访问首页时程序先查询缓存键是否存在,如果存在则直接使用缓存的值;否则重新计算排名并将计算结果赋值给该键并同时设置该键的生存时间为两个小时。伪代码如下:
rank=GET cache:rank
if not rank
rank=计算排名...
MUlTI
SET cache:rank, rank
EXPIRE cache:rank, 7200
EXEC
然而在一些场合中这种方法并不能满足需要。当服务器内存有限时,如果大量地使用缓存键且生存时间设置得过长就会导致Redis占满内存;另一方面如果为了防止Redis占用内存过大而将缓存键的生存时间设得太短,就可能导致缓存命中率过低并且大量内存白白地闲置。实际开发中会发现很难为缓存键设置合理的生存时间,为此可以限制Redis能够使用的最大内存,并让Redis按照一定的规则淘汰不需要的缓存键,这种方式在只将Redis用作缓存系统时非常实用。
具体的设置方法为:修改配置文件的maxmemory参数,限制Redis最大可用内存大小(单位是字节),当超出了这个限制时Redis会依据maxmemory-policy参数指定的策略来删除不需要的键,直到Redis占用的内存小于指定内存。
maxmemory-policy支持的规则如表4-1所示。其中的LRU(Least Recently Used)算法即“最近最少使用”,其认为最近最少使用的键在未来一段时间内也不会被用到,即当需要空间时这些键是可以被删除的。
表4-1 Redis支持的淘汰键的规则
续表
如当maxmemory-policy设置为allkeys-lru时,一旦Redis占用的内存超过了限制值,Redis会不断地删除数据库中最近最少使用的键① ,直到占用的内存小于限制值。
注释:①事实上Redis并不会准确地将整个数据库中最久未被使用的键删除,而是每次从数据库中随机取3个键并删除这3个键中最久未被使用的键。删除生存时间最接近的键的实现方法也是这样。“3”这个数字可以通过Redis的配置文件中的maxmemory-samples参数设置。
午后,宋老师正在批改学生们提交的程序,再过几天就会迎来第一次计算机全市联考。他在每个学生的程序代码末尾都用注释详细地做了批注——严谨的治学态度让他备受学生们的爱戴。
一个电话打来。“小白的?”宋老师拿出手机,“博客最近怎么样了?”未及小白开口,他就抢先问道。
特别好!现在平均每天都有50多人访问我的博客。不过咋天我收到一个访客的邮件,他向我反映了一个问题:查看一个标签下的文章列表时文章不是按照时间顺序排列的,找起来很麻烦。我看了一下代码,发现程序中是使用SMEMBERS命令获取标签下的文章列表,因为集合类型是无序的,所以不能实现按照文章的发布时间排列。我考虑过使用有序集合类型存储标签,但是有序集合类型的集合操作不如集合类型强大。您有什么好方法来解决这个问题吗?
方法有很多,我推荐使用SORT命令,你先挂了电话,我写好后发邮件给你吧。
集合类型提供了强大的集合操作命令,但是如果需要排序就要用到有序集合类型。Redis的作者在设计Redis的命令时考虑到了不同数据类型的使用场景,对于不常用到的或者在不损失过多性能的前提下可以使用现有命令来实现的功能,Redis就不会单独提供命令来实现。这一原则使得Redis在拥有强大功能的同时保持着相对精简的命令。
有序集合常见的使用场景是大数据排序,如游戏的玩家排行榜,所以很少会需要获得键中的全部数据。同样Redis认为开发者在做完交集、并集运算后不需要直接获得全部结果,而是会希望将结果存入新的键中以便后续处理。这解释了为什么有序集合只有ZINTERSTORE和ZUNIONSTORE命令而没有ZINTER和ZUNION命令。
当然实际使用中确实会遇到像小白那样需要直接获得集合运算结果的情况,除了等待Redis加入相关命令,我们还可以使用MULTI, ZINTERSTORE, ZRANGE, DEL 和EXEC 这5个命令自己实现ZINTER:
MULTI
ZINTERSTORE tempKey ...
ZRANGE tempKey ...
DEL tempKey
EXEC
除了使用有序集合外,我们还可以借助Redis提供的SORT命令来解决小白的问题。SORT命令可以对列表类型、集合类型和有序集合类型键进行排序,并且可以完成与关系数据库中的连接查询相类似的任务。
小白的博客中标有“ruby”标签的文章的ID分别是:“2”,“6”,“12”,“26”。由于在集合类型中所有元素是无序的,所以使用SMEMBERS命令并不能获得有序的结果① 。为了能够让博客的标签页面下的文章也能按照发布的时间顺序排列(如果不考虑发布后再修改文章发布时间,就是按照文章ID的顺序排列),可以借助SORT命令实现,方法如下所示:
注释:①集合类型经常被用于存储对象的ID,很多情况下都是整数。所以Redis对这种情况进行了特殊的优化,元素的排列是有序的。4.6节会详细介绍具体的原理。
redis>SORT tag:ruby:posts
1) "2"
2) "6"
3) "12"
4) "26"
是不是十分简单?除了集合类型,SORT命令还可以对列表类型和有序集合类型进行排序:
redis>LPUSH mylist 4 2 6 1 3 7
(integer)6
redis>SORT mylist
1) "1"
2) "2"
3) "3"
4) "4"
5) "6"
6) "7"
在对有序集合类型排序时会忽略元素的分数,只针对元素自身的值进行排序。例如:
redis>ZADD myzset 50 2 40 3 20 1 60 5
(integer) 4
redis>SORT myzset
1) "1"
2) "2"
3) "3"
4) "5"
除了可以排列数字外,SORT命令还可以通过ALPHA参数实现按照字典顺序排列非数字元素,就像这样:
redis>LPUSH mylistalpha a c e d B C A
(integer) 7
redis>SORT mylistalpha
(error) ERR One or more scores can't be converted into double
redis>SORT mylistalpha ALPHA
1) "A"
2) "B"
3) "C"
4) "a"
5) "c"
6) "d"
7) "e"
从这段示例中可以看到如果没有加ALPHA参数的话,SORT命令会尝试将所有元素转换成双精度浮点数来比较,如果无法转换则会提示错误。
回到小白的问题,SORT命令默认是按照从小到大的顺序排列,而一般博客中显示文章的顺序都是按照时间倒序的,即最新的文章显示在最前面。SORT命令的 DESC参数可以实现将元素按照从大到小的顺序排列:
redis>SORT tag:ruby:posts DESC
1) "26"
2) "12"
3) "6"
4) "2"
那么如果文章数量过多需要分页显示呢?SORT命令还支持LIMIT参数来返回指定范围的结果。用法和SQL语句一样,LIMIT offset count,表示跳过前offset个元素并获取之后的count个元素。
SORT命令的参数可以组合使用,像这样:
redis>SORT tag:ruby:posts DESC LIMIT 1 2
1) "12"
2) "6"
很多情况下列表(或集合、有序集合)中存储的元素值代表的是对象的ID(如标签集合中存储的是文章对象的ID),单纯对这些ID自身排序有时意义并不大。更多的时候我们希望根据ID对应的对象的某个属性进行排序。回想3.6节,我们通过使用有序集合键来存储文章ID列表,使得小白的博客能够支持修改文章时间,所以文章ID的顺序和文章的发布时间的顺序并不完全一致,因此4.3.2节介绍的对文章ID本身排序就变得没有意义了。小白的博客是使用散列类型键存储文章对象的,其中time字段存储的就是文章的发布时间。现在我们知道ID为“2”,“6”,“12”和“26”的四篇文章的time字段的值分别为“1352619200”,“1352619600”,“1352620100”和“1352620000”(Unix时间)。如果要按照文章的发布时间递减排列结果应为“12”,“26”,“6”,“2”。为了获得这样的结果,需要使用SORT命令的另一个强大的参数——BY。
BY 参数的语法为“BY参考键”。其中参考键可以是字符串类型键或者是散列类型键的某个字段(表示为键名->字段名)。如果提供了BY参数,SORT命令将不再依据元素自身的值进行排序,而是对每个元素使用元素的值替换参考键中的第一个“*”并获取其值,然后依据该值对元素排序。就像这样:
redis>SORT tag:ruby:posts BY post:*->time DESC
1) "12"
2) "26"
3) "6"
4) "2"
在上例中SORT命令会读取post:2、post:6、post:12、post:26几个散列键中的time字段的值并以此决定tag:ruby:posts键中各个文章ID的顺序。
除了散列类型之外,参考键还可以是字符串类型,比如:
redis>LPUSH sortbylist 2 1 3
(integer) 3
redis>SET itemscore:1 50
OK
redis>SET itemscore:2 100
OK
redis>SET itemscore:3 -10
OK
redis>SORT sortbylist BY itemscore:* DESC
1) "2"
2) "1"
3) "3"
当参考键名不包含“*”时(即常量键名,与元素值无关),SORT命令将不会执行排序操作,因为Redis认为这种情况是没有意义的(因为所有要比较的值都一样)。例如:
redis>SORT sortbylist BY anytext
1) "3"
2) "1"
3) "2"
例子中anytext是常量键名(甚至anytext键可以不存在),此时SORT的结果与LRANGE的结果相同,没有执行排序操作。在不需要排序但需要借助SORT命令获得与元素相关联的数据时(见4.3.4节),常量键名是很有用的。
如果几个元素的参考键值相同,则SORT命令会再比较元素本身的值来决定元素的顺序。像这样:
redis>LPUSH sortbylist 4
(integer) 4
redis>SET itemscore:4 50
OK
redis>SORT sortbylist BY itemscore:* DESC
1) "2"
2) "4"
3) "1"
4) "3"
示例中元素“4”的参考键itemscore:4的值和元素“1”的参考键itemscore:1的值都是50,所以SORT命令会再比较“4”和“1”元素本身的大小来决定两者的顺序。
当某个元素的参考键不存在时,会默认参考键的值为0:
redis>LPUSH sortbylist 5
(integer) 5
redis>SORT sortbylist BY itemscore:* DESC
1) "2"
2) "4"
3) "1"
4) "5"
5) "3"
上例中“5”排在了“3”的前面,是因为“5”的参考键不存在,所以默认为0,而“3”的参考键值为10。
补充知识 参考键虽然支持散列类型,但是“*”只能在“->”符号前面(即键名部分)才有用,在“->”后(即字段名部分)会被当成字段名本身而不会作为占位符被元素的值替換,即常量键名。但是实际运行时会发现一个有趣的结果:
redis>SORT sortbylist BY somekey->somefield:*
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
上面提到了当参考键名是常量键名时SORT命令将不会执行排序操作,然而上例中确进行了排序,而且只是对元素本身进行排序。这是因为Redis判断参考键名是不是常量键名的方式是判断参考键名中是否包含“*”,而somekey->somefield:*中包含“*”所以不是常量键名。所以在排序的时候Redis对每个元素都会读取键somekey中的somefield:*字段(“*”不会被替換),无论能否获得其值,每个元素的参考键值是相同的,所以Redis会按照元素本身的大小排列。
现在小白的博客已经可以按照文章的发布顺序获得一个标签下的文章ID列表了,接下来要做的事就是对每个ID都使用HGET命令获取文章的标题以显示在博客列表页中。有没有觉得很麻烦?不论你的答案如何,都有一种更简单的方式来完成这个操作,那就是借助SORT命令的GET参数。
GET参数不影响排序,它的作用是使SORT命令的返回结果不再是元素自身的值,而是GET参数中指定的键值。GET参数的规则和BY参数一样,GET参数也支持字符串类型和散列类型的键,并使用“*”作为占位符。要实现在排序后直接返回ID对应的文章标题,可以这样写:
redis>SORT tag:ruby:posts BY post:*->time DESC GET post:*->title
1) "Windows 8 app designs"
2) "RethinkDB - An open-source distributed database built with love"
3) "Uses for cURL"
4) "The Nature of Ruby"
在一个SORT命令中可以有多个GET参数(而BY参数只能有一个),所以还可以这样用:
redis>SORT tag:ruby:posts BY post:*->time DESC GET post:*->title GET post:*->time
1) "Windows 8 app designs"
2) "1352620100"
3) "RethinkDB - An open-source distributed database built with love"
4) "1352620000"
5) "Uses for cURL"
6) "1352619600"
7) "The Nature of Ruby"
8) "1352619200"
可见有N个GET参数,每个元素返回的结果就有N行。这时有个问题:如果还需要返回文章ID该怎么办?答案是使用GET #。就像下面这样:
redis>SORT tag:ruby:posts BY post:*->time DESC GET post:*->title GET post:*->time GET #
1) "Windows 8 app designs"
2) "1352620100"
3) "12"
4) "RethinkDB - An open-source distributed database built with love"
5) "1352620000"
6) "26"
7) "Uses for cURL"
8) "1352619600"
9) "6"
10) "The Nature of Ruby"
11) "1352619200"
12) "2"
也就是说,GET #会返回元素本身的值。
默认情况下SORT会直接返回排序结果,如果希望保存排序结果,可以使用STORE参数。如希望把结果保存到sort.result键中:
redis>SORT tag:ruby:posts BY post:*->time DESC GET post:*->title GET post:*->time
GET # STORE sort.result
(integer) 12
redis>LRANGE sort.result 0 -1
1) "Windows 8 app designs"
2) "1352620100"
3) "12"
4) "RethinkDB - An open-source distributed database built with love"
5) "1352620000"
6) "26"
7) "Uses for cURL"
8) "1352619600"
9) "6"
10) "The Nature of Ruby"
11) "1352619200"
12) "2"
保存后的键的类型为列表类型,如果键已经存在则会覆盖它。加上STORE参数后SORT命令的返回值为结果的个数。
STORE参数常用来结合EXPIRE命令缓存排序结果,如下面的伪代码:
#判断是否存在之前排序结果的缓存
isCacheExists = EXISTS cache.sort
if isCacheExists is 1
#如果存在则直接返回
return LRANGE cache.sort, 0, -1
else
#如果不存在,则使用SORT命令排序并将结果存入cache.sort键中作为缓存
sortResult=SORT some.list STORE cache.sort
#设置缓存的生存时间为10分钟
EXPIRE cache.sort, 600
#返回排序结果
return sortResult
SORT是Redis中最强大最复杂的命令之一,如果使用不好很容易成为性能瓶颈。SORT命令的时间复杂度是0(n+mlogm),其中n表示要排序的列表(集合或有序集合)中的元素个数,m表示要返回的元素个数。当n较大的时候SORT命令的性能相对较低,并且Redis在排序前会建立一个长度为n① 的容器来存储待排序的元素,虽然是一个临时的过程,但如果同时进行较多的大数据量排序操作则会严重影响性能。
注释:①有一个例外是当键类型为有序集合且参考键为常量键名时容器大小为m而不是n。
所以开发中使用SORT命令时需要注意以下几点。
(1)尽可能减少待排序键中元素的数量(使n尽可能小)。
(2)使用LIMIT参数只获取需要的数据(使m尽可能小)。
(3)如果要排序的数据数量较大,尽可能使用STORE参数将结果缓存。
凭着小白的用心经营,博客的访问量逐渐增多,甚至有了小白自己的粉丝。这不,小白刚收到一封来自粉丝的邮件,在邮件中那个粉丝强烈建议小白给博客加入邮件订阅功能,这样当小白发布新文章后订阅小白博客的用户就可以收到通知邮件了。在信的末尾,那个粉丝还着重强调了一下:“这个功能对不习惯使用RSS的用户很重要,希望能够加上!”
看过信后,小白心想:“是个好建议!不过话说回来,似乎他还没发现其实我的博客连RSS功能都没有。”
邮件订阅功能太好实现了,无非是在博客首页放一个文本框供访客输入自己的邮箱地址,提交后博客会将该地址存入Redis的一个集合类型键中(使用集合类型是为了保证同一邮箱地址不会存储多个)。每当发布新文章时,就向收集到的邮箱地址发送通知邮件。
想的简单,可是做出来后小白却发现了一个问题:输入邮箱地址提交后,页面需要很久时间才能载入完。
原来小白为了确保用户没有输入他人的邮箱,在提交之后程序会向用户输入的邮箱发送一封包含确认链接的邮件,只有用户单击这个链接后对应的邮箱地址才会被程序记录。可是由于发送邮件需要连接到一个远程的邮件发送服务器,网络好的情况下也得花上2秒左右的时间,赶上网络不好10秒都必能发完。所以每次用户提交邮箱后页面都要等待程序发送完邮件才能加载出来,而加载出来的页面上显示的内容只是提示用户查看自己的邮箱单击确认链接。“完全可以等页面加载出来后再发送邮件,这样用户就不需要等了。”小白喃喃道。
按照惯例,有问题问宋老师,小白给宋老师发了一封邮件,不久就收到了答复。
小白的问题在网站开发中十分常见,当页面需要进行如发送邮件、复杂数据运算等耗时较长的操作时会阻塞页面的渲染。为了避免用户等待太久,应该使用独立的线程来完成这类操作。不过一些编程语言或框架不易实现多线程,这时很容易就会想到通过其他进程来实现。就小白的例子来说,设想有一个进程能够完成发邮件的功能,那么在页面中只需要想办法通知这个进程向指定的地址发送邮件就可以了。
通知的过程可以借助任务队列来实现。任务队列顾名思义,就是“传递任务的队列”。与任务队列进行交互的实体有两类,一类是生产者(producer),一类是消费者(consumer)。生产者会将需要处理的任务放入任务队列中,而消费者则不断地从任务队列中读入任务信息并执行。
对于发邮件这个操作来说页面程序就是生产者,而发邮件的进程就是消费者。当需要发送邮件时,页面程序会将收件地址、邮件主题和邮件正文组装成一个任务后存入任务队列中。同时发邮件的进程会不断检查任务队列,一旦发现有新的任务便会将其从队列中取出并执行。由此实现了进程间的通信。
使用任务队列有如下好处。
(1)松耦合。生产者和消费者无需知道彼此的实现细节,只需要约定好任务的描述格式。这使得生产者和消费者可以由不同的团队使用不同的编程语言编写。
(2)易于扩展消费者可以有多个,而且可以分布在不同的服务器中,如图4-1所示。借此可以轻易地降低单台服务器的负载。
图4-1 可以有多个消费者分配任务队列中的任务
说到队列很自然就能想到Redis的列表类型,3.4.2节介绍了使用LPUSH和RPOP命令实现队列的概念。如果要实现任务队列,只需要让生产者将任务使用LPUSH命令加入到某个键中,另一边让消费者不断地使用RPOP命令从该键中取出任务即可。
在小白的例子中,完成发邮件的任务需要知道收件地址、邮件主题和邮件正文。所以生产者需要将这三个信息组成对象并序列化成字符串,然后将其加入到任务队列中。而消费者则循环从队列中拉取任务,就像如下伪代码:
#无限循环读取任务队列中的内容
loop
task=RPOR queue
if task
#如果任务队列中有任务则执行它
execute(task)
else
#如果没有则等待1秒以免过于频繁地请求数据
wait 1 second
到此一个使用Redis实现的简单的任务队列就写好了。不过还有一点不完美的地方:当任务队列中没有任务时消费者每秒都会调用一次RPOP命令查看是否有新任务。如果可以实现一旦有新任务加入任务队列就通知消费者就好了。其实借助 BRPOP 命令就可以实现这样的需求。
BRPOP命令和RPOP命令相似,唯一的区别是当列表中没有元素时BRPOP命令会一直阻塞住连接,直到有新元素加入。如上段代码可改写为:
loop
#如果任务队列中没有新任务,BRPOP 命令会一直阻塞,不会执行execute()。
task=BRPOP queue, 0
#返回值是一个数组(见下介绍),数组第二个元素是我们需要的任务。
execute(task[1])
BRPOP命令接收两个参数,第一个是键名,第二个是超时时间,单位是秒。当超过了此时间仍然没有获得新元素的话就会返回nil。上例中超时时间为“0”,表示不限制等待的时间,即如果没有新元素加入列表就会永远阻塞下去。
当获得一个元素后BRPOP命令返回两个值,分别是键名和元素值。为了测试BRPOP命令,我们可以打开两个redis-cli实例,在实例A中:
redis A>BRPOP queue 0
键入回车后实例1会处于阻塞状态,这时在实例B中向queue中加入一个元素:
redis B>LPUSH queue task
(integer) 1
在LPUSH命令执行后实例A马上就返回了结果:
1) "queue"
2) "task"
同时会发现queue中的元素已经被取走:
redis>LLEN queue
(integer) 0
除了BRPOP命令外,Redis还提供了BLPOP,和BRPOP的区别在与从队列取元素时BLPOP会从队列左边取。具体可以参照LPOP理解,这里不再赘述。
前面说到了小白博客需要在发布文章的时候向每个订阅者发送邮件,这一步骤同样可以使用任务队列实现。由于要执行的任务和发送确认邮件一样,所以二者可以共用一个消费者。然而设想这样的情况:假设订阅小白博客的用户有1000人,那么当发布一篇新文章后博客就会向任务队列中添加1000个发送通知邮件的任务。如果每发一封邮件需要10秒,全部完成这1000个任务就需要近3个小时。问题来了,假如这期间有新的用户想要订阅小白博客,当他提交完自己的邮箱并看到网页提示他查收确认邮件时,他并不知道向自己发送确认邮件的任务被加入到了已经有1000个任务的队列中。要收到确认邮件,他不得不等待近3个小时。多么糟糕的用户体验!而另一方面发布新文章后通知订阅用户的任务并不是很紧急,大多数用户并不要求有新文章后马上就能收到通知邮件,甚至延迟一天的时间在很多情况下也是可以接受的。
所以可以得出结论当发送确认邮件和发送通知邮件两种任务同时存在时,应该优先执行前者。为了实现这一目的,我们需要实现一个优先级队列。
BRPOP命令可以同时接收多个键,其完整的命令格式为BLPOP key [key …]timeout,如BLPOP queue:1 queue:2 0。意义是同时检测多个键,如果所有键都没有元素则阻塞,如果其中有一个键有元素则会从该键中弹出元素。例如,打开两个redis-cli实例,在实例A中:
redis A>BLPOP queue:1 queue:2 queue:3 0
在实例B中:
redis B>LPUSH queue:2 task
(integer) 1
则实例A中会返回:
1) "queue:2"
2) "task"
如果多个键都有元素则按照从左到右的顺序取第一个键中的一个元素。我们先在queue:2和queue:3中各加入一个元素:
redis>LPUSH queue:2 task1
1) (integer) 1
redis>LPUSH queue:3 task2
2) (integer) 1
然后执行BRPOP命令:
redis>BRPOP queue:1 queue:2 queue:3 0
1) "queue:2"
2) "task1"
借此特性可以实现区分优先级的任务队列。我们分别使用queue:confirmation.email和queue:notification.email两个键存储发送确认邮件和发送通知邮件两种任务,然后将消费者的代码改为:
loop
task =
BRPOP queue:confirmation.email,
queue:notification.email,
0
execute(task[1])
这时一旦发送确认邮件的任务被加入到queue:confirmation.email队列中,无论queue: notification.email还有多少任务,消费者都会优先完成发送确认邮件的任务。
除了实现任务队列外,Redis还提供了一组命令可以让开发者实现“发布/订阅”(publish/subscribe)模式。“发布/订阅”模式同样可以实现进程间的消息传递,其原理是这样的:
“发布/订阅”模式中包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息。
发布者发布消息的命令是PUBLISH,用法是PUBLISH channel message,如向channel.1说一声“hi”:
redis>PUBLISH channel.1 hi
(integer) 0
这样消息就发出去了。PUBLISH命令的返回值表示接收到这条消息的订阅者数量。因为此时没有客户端订阅channel.1,所以返回0。发出去的消息不会被持久化,也就是说当有客户端订阅channel.1后只能收到后续发布到该频道的消息,之前发送的就收不到了。
订阅频道的命令是SUBSCRIBE,可以同时订阅多个频道,用法是SUBSCRIBE channel[channel …]。现在新开一个redis-cli实例A,用它来订阅channel.1:
redis A>SUBSCRIBE channel.1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel.1"
3) (integer) 1
执行SUBSCRIBE命令后客户端会进入订阅状态,处于此状态下客户端不能使用除SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE/PUNSUBSCRIBE这4个属于“发布/订阅”模式的命令之外的命令(后面3个命令会在下面介绍),否则会报错。
进入订阅状态后客户端可能收到三种类型的回复。每种类型的回复都包含3个值,第一个值是消息的类型,根据消息类型的不同,第二、三个值的含义也不同。消息类型可能的取值有:
(1)Subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个值是当前客户端订阅的频道数量。
(2)message。这个类型的回复是我们最关心的,它表示接收到的消息。第二个值表示产生消息的频道名称,第三个值是消息的内容。
(3)unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非“发布/订阅”模式的命令了。
上例中当实例A订阅了channel.1进入订阅状态后收到了一条subscribe类型的回复,这时我们打开另一个redis-cli实例B,并向channel.1发送一条消息:
redis B>PUBLISH channel.1 hi!
(integer) 1
返回值为1表示有一个客户端订阅了channel.1,此时实例A 收到了类型为message的回复:
1) "message"
2) "channel.1"
3) "hi!"
使用UNSUBSCRIBE命令可以取消订阅指定的频道,用法为UNSUBSCRIBE [channel[channel …]],如果不指定频道则会取消订阅所有频道① 。
注释:①由于redis-cli的限制我们无法在其中测试UNSUBSCRIBE命令。
除了可以使用SUBSCRIBE命令订阅指定名称的频道外,还可以使用PSUBSCRIBE命令订阅指定的规则。规则支持glob风格通配符格式(见3.1节),下面我们新打开一个redis-cli实例C进行演示:
redis C>PSUBSCRIBE channel.?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel.?*"
3) (integer) 1
规则channel.?*可以匹配channel.1和channel.10,但不会匹配channel.。这时在实例B中发布消息:
redis B>PUBLISH channel.1 hi!
(integer) 2
返回结果为2是因为实例A和实例C两个客户端都订阅了channel.1频道。实例C接收到的回复是:
1) "pmessage"
2) "channel.?*"
3) "channel.1"
4) "hi!"
第一个值表示这条消息是通过PSUBSCRIBE命令订阅频道而收到的,第二个值表示订阅时使用的通配符,第三个值表示实际收到消息的频道命令,第四个值则是消息内容。
提示 使用PSUBSCRIBE命令可以重复订阅一个频道,如某客户端执行了PSUBSCRIBE channel.? channel.?*,这时向channel.2发布消息后该客户端会收到两条消息,而同时PUBLISH命令返回的值也是2而不是1。同样的,如果有另一个客户端执行了SUBSCRIBE channel.10,和PSUBSCRIBE channel.?*的话,向channel.10发送命令该客户端也会收到两条消息(但是是两种类型,message 和pmessage),同时PUBLISH命令会返回2。
PUNSUBSCRIBE命令可以退订指定的规则,用法是PUNSUBSCRIBE [pattern[pattern …]],如果没有参数则会退订所有规则。
注意 使用PUNSUBSCRIBE命令只能退订通过PSUBSCRIBE命令订阅的规则,不会影响直接通过SUBSCRIBE命令订阅的频道;同样UNSUBSCRIBE命令也不会影响通过PSUBSCRIBE命令订阅的规则。另外容易出错的一点是使用PUNSUBSCRIBE命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以PUNSUBSCRIBE*无法退订channel.*规则,而是必须使用PUNSUBSCRIBE channel.*才能退订。
客户端和Redis使用TCP协议连接。不论是客户端向Redis发送命令还是Redis向客户端返回命令的执行结果,都需要经过网络传输,这两个部分的总耗时称为往返时延。根据网络性能不同,往返时延也不同,大致来说到本地回环地址(loop backaddress)的往返时延在数量级上相当于Redis处理一条简单命令(如LPUSH list 1 2 3)的时间。如果执行较多的命令,每个命令的往返时延累加起来对性能还是有一定影响的。
在执行多个命令时每条命令都需要等待上一条命令执行完(即收到Redis的返回结果)才能执行,即使命令不需要上一条命令的执行结果。如要获得post:1、post:2和post:3这3个键中的title字段,需要执行三条命令,如图4-2所示。
图4-2 不使用管道时的命令执行示意图(纵向表示时间)
Redis的底层通信协议对管道(pipelining)提供了支持。通过管道可以一次性发送多条命令并在执行完后一次性将结果返回,当一组命令中每条命令都不依赖于之前命令的执行结果时就可以将这组命令一起通过管道发出。管道通过减少客户端与Redis的通信次数来实现降低往返时延累计值的目的,如图4-3所示。
图4-3 使用管道时的命令执行示意图
Jim Gray① 曾经说过:“内存是新的硬盘,硬盘是新的磁带。”内存的容量越来越大,价格也越来越便宜。2012年年底,亚马逊宣布即将发布一个拥有240GB内存的EC2实例,如果放到若干年前来看,这个容量就算是对于硬盘来说也是很大的了。即便如此,相比于硬盘而言,内存在今天仍然显得比较昂贵。而Redis是一个基于内存的数据库,所有的数据都存储在内存中,所以如何优化存储,减少内存空间占用对成本控制来说是一个非常重要的话题。
注释:①Jim Gray是1998年的图灵奖得主,在数据库(尤其是事务)方面做出过卓越的贡献。其于2007年独自驾船在海上失踪。
精简键名和键值是最直观的减少内存占用的方式,如将键名very.important.person:20改成VIP:20。当然精简键名一定要把握好尺度,不能单纯为了节约空间而使用不易理解的键名(比如将VIP:20修改为V:20,这样既不易维护,还容易造成命名沖突)。又比如一个存储用户性别的字符串类型键的取值是male和female,我们可以将其修改成m和f来为每条记录节约几个字节的空间(更好的方法是使用0和1来表示性别,稍后会详细介绍原因)① 。
注释:①3.2.4节还介绍过使用字符串类型的位操作来存储性别,更加节约空间。
有时候仅凭精简键名和键值所减少的空间并不足以满足需求,这时就需要根据Redis内部编码规则来节省更多的空间。Redis为每种数据类型都提供了两种内部编码方式,以散列类型为例,散列类型是通过散列表实现的,这样就可以实现0(1)时间复杂度的查找、赋值操作,然而当键中元素很少的时候,0(1)的操作并不会比0(n)有明显的性能提高,所以这种情况下Redis会采用一种更为紧凑但性能稍差(获取元素的时间复杂度为0(n))的内部编码方式。内部编码方式的选择对于开发者来说是透明的,Redis会根据实际情况自动调整。当键中元素变多时Redis会自动将该键的内部编码方式转换成散列表。如果想查看一个键的内部编码方式可以使用OBJECT ENCODING命令,例如:
redis>SET foo bar
OK
redis>OBJECT ENCODING foo
"raw"
Redis的每个键值都是使用一个redisObject结构体保存的,redisObject的定义如下:
typedef struct redisObject {
unsigned type:4;
unsigned notused:2; /* Not used */
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
int refcount;
void *ptr;
}robj;
其中type字段表示的是键值的数据类型,取值可以是如下内容:
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4
encoding字段表示的就是Redis键值的内部编码方式,取值可以是:
#define REDIS_ENCODING_RAW 0 /* Raw representation */
#define REDIS_ENCODING_INT 1 ed as integer */
#define REDIS_ENCODING_HT 2 /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
各个数据类型可能采用的内部编码方式以及相应的OBJECT ENCODING命令执行结果如表4-2所示。
表4-2 每个数据类型都可能采用两种内部编码方式之一来存储
下面针对每种数据类型分别介绍其内部编码规则及优化方式。
1.字符串类型
Redis使用一个sdshdr类型的变量来存储字符串,而redisObject的ptr字段指向的是该变量的地址。sdshdr的定义如下:
struct sdshdr {
int len;
int free;
char buf[];
};
其中len字段表示的是字符串的长度,free字段表示buf中的剩余空间,而buf字段存储的才是字符串的内容。
所以当执行SET key foobar时,存储键值需要占用的空间是sizeof(redisObject)+sizeof(sdshdr)+strlen("foobar")=30字节① ,如图4-4所示。
注释:①本节所说的字节数以64位Linux系统为前提。
图4-4 字符串键值“foobar”的存储结构
而当键值内容可以用一个64位有符号整数表示时,Redis会将键值转换成long类型来存储。如SET key 123456,实际占用的空间是sizeof(redisObject)=16字节,比存储"foobar"节省了一半的存储空间,如图4-5所示。
图4-5 字符串键值“123456”的内存结构
redisObject中的refcount字段存储的是该键值被引用数量,即一个键值可以被多个键引用。Redis启动后会预先建立10000个分别存储从0到9999这些数字的redisObject类型变量作为共享对象,如果要设置的字符串键值在这10000个数字内(如SET key1 123)则可以直接引用共享对象而不用再建立一个redisObject了,也就是说存储键值占用的空间是0字节,如图4-6所示。
图4-6 当执行了SET key1 123和SET key2 123后,key1和key2两个键都直接引用了一个已经建立好的共享对象,节省了存储空间
由此可见,使用字符串类型键存储对象ID这种小数字是非常节省存储空间的,Redis只需存储键名和一个对共享对象的引用即可。
提示 当通过配置文件参数maxmemory设置了Redis可用的最大空间大小时,Redis不会使用共享对象,因为对于每一个键值都需要使用一个redisObject来记录其LRU信息。
2.散列类型
散列类型的内部编码方式可能是REDIS_ENCODING_HT或 REDIS_ENCODING_ZIPLIST① 。在配置文件中可以定义使用REDIS_ENCODING_ZIPLIST方式编码散列类型的时机:
注释:①在Redis 2.4及以前的版本中散列类型的键采用REDIS_ENCODING_HT或REDIS_ENCODING_ZIPMAP的编码方式。
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
当散列类型键的字段个数少于hash-max-ziplist-entries参数值且每个字段名和字段值的长度都小于hash-max-ziplist-value参数值(单位为字节)时,Redis就会使用REDIS_ ENCODING_ZIPLIST来存储该键,否则就会使用REDIS_ENCODING_HT。转换过程是透明的,每当键值变更后Redis都会自动判断是否满足条件来完成转换。
REDIS_ENCODING_HT编码即散列表,可以实现O(1)时间复杂度的赋值取值等操作,其字段和字段值都是使用redisObject存储的,所以前面讲到的字符串类型键值的优化方法同样适用于散列类型键的字段和字段值。
提示 Redis的键值对存储也是通过散列表实现的,与REDIS_ENCODING_HT编码方式类似,但键名并非使用redisObject存储,所以键名“123456”并不会比“abcdef”占用更少的空间。之所以不对键名进行优化是因为绝大多数情况下键名都不会是纯数字。
补充知识 Redis支持多数据库,每个数据库中的数据都是通过结构体redisDb存储的。redisDb的定义如下:
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
dict类型就是散列表结构,expires存储的是数据的过期时间。当Redis启动时会根据配置文件中databases参数指定的数量创建若干个redisDb类型变量存储不同数据库中的数据。
REDIS_ENCODING_ZIPLIST编码类型是一种紧凑的编码格式,它牺牲了部分读取性能以换取极高的空间利用率,适合在元素较少时使用。该编码类型同样还在列表类型和有序集合类型中使用。REDIS_ENCODING_ZIPLIST编码结构如图4-7所示,其中zlbytes是uint32_t类型,表示整个结构占用的空间。zltail也是uint32_t类型,表示到最后一个元素的偏移,记录zltail使得程序可以直接定位到尾部元素而无需遍历整个结构,执行从尾部弹出(对列表类型而言)等操作时速度更快。zllen是uint16_t类型,存储的是元素的数量。zlend是一个单字节标识,标记结构的末尾,值永远是255。
图4-7 REDIS_ENCODING_ZIPLIST编码的内存结构
在REDIS_ENCODING_ZIPLIST 中每个元素由4个部分组成。
第一个部分用来存储前一个元素的大小以实现倒序查找,当前一个元素的大小小于254字节时第一个部分占用1个字节,否则会占用5个字节。
第二、三个部分分别是元素的编码类型和元素的大小,当元素的大小小于或等于63个字节时,元素的编码类型是ZIP_STR_06B(即0<<6),同时第三个部分用6个二进制位来记录元素的长度,所以第二、三个部分总占用空间是1字节。当元素的大小大于63且小于或等于16383字节时,第二、三个部分总占用空间是2字节。当元素的大小大于16383字节时,第二、三个部分总占用空间是5字节。
第四个部分是元素的实际内容,如果元素可以转换成数字的话Redis会使用相应的数字类型来存储以节省空间,并用第二、三个部分来表示数字的类型(int16_t、int32_t等)。
使用REDIS_ENCODING_ZIPLIST编码存储散列类型时元素的排列方式是:元素1存储字段1,元素2存储字段值1,依次类推,如图4-8所示。
图4-8 使用REDIS_ENCODING_ZIPLIST编码存储散列类型的内存结构
例如,当执行命令HSET hkey foo bar命令后,hkey键值的内存结构如图4-9所示。
图4-9 hkey键值的内存结构
下次需要执行HSET hkey foo anothervalue时Redis需要从头开始找到值为foo的元素(查找时每次都会跳过一个元素以保证只查找字段名),找到后删除其下一个元素,并将新值anothervalue插入。删除和插入都需要移动后面的内存数据,而且查找操作也需要遍历才能完成,可想而知当散列键中数据多时性能将很低,所以不宜将hash-max-ziplist-entries和hash-max-ziplist-value两个参数设置得很大。
3.列表类型
列表类型的内部编码方式可能是REDIS_ENCODING_LINKEDLIST或REDIS ENCODINGZIPLIST。同样在配置文件中可以定义使用REDIS_ENCODING_ZIPLIST方式编码的时机:
list-max-ziplist-entries 512
list-max-ziplist-value 64
具体转换方式和散列类型一样,这里不再赘述。
REDIS_ENCODING_LINKEDLIST编码方式即双向链表,链表中的每个元素是用redisObject存储的,所以此种编码方式下元素值的优化方法与字符串类型的键值相同。
而使用REDIS_ENCODING_ZIPLIST编码方式时具体的表现和散列类型一样,由于REDIS_ENCODING_ZIPLIST编码方式同样支持倒序访问,所以采用此种编码方式时获取两端的数据依然较快。
4.集合类型
集合类型的内部编码方式可能是REDIS_ENCODING_HT或REDIS_ENCODING_INTSET。当集合中的所有元素都是整数且元素的个数小于配置文件中的set-max-intset-entries参数指定值(默认是512)时Redis会使用REDIS_ENCODING_INTSET编码存储该集合,否则会使用REDIS_ENCODING_HT来存储。
REDIS_ENCODING_INTSET编码存储结构体intset的定义是:
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
其中contents存储的就是集合中的元素值,根据encoding的不同,每个元素占用的字节大小不同。默认的encoding是INTSET_ENC_INT16(即2个字节),当新增加的整数元素无法使用2个字节表示时,Redis会将该集合的encoding升级为INTSET_ENC_INT32(即4个字节)并调整之前所有元素的位置和长度,同样集合的encoding还可升级为INTSET_ENC_INT64(即8个字节)。
REDIS_ENCODING_INTSET编码以有序的方式存储元素(所以使用SMEMBERS命令获得的结果是有序的),使得可以使用二分算法查找元素。然而无论是添加还是删除元素,Redis都需要调整后面元素的内存位置,所以当集合中的元素太多时性能较差。
当新增加的元素不是整数或集合中的元素数量超过了set-max-intset-entries参数指定值时,Redis会自动将该集合的存储结构转换成REDIS_ENCODING_HT。
注意 当集合的存储结构转換成REDIS_ENCODING_HT后,即使将集合中的所有非整数元素删除,Redis也不会自动将存储结构转換回REDIS_ENCODING_INTSET。因为如果要支持自动回转,就意味着Redis在每次删除元素时都需要遍历集合中的键来判断是否可以转換回原来的编码,这会使得删除元素变成了时间复杂度为0(n)的操作。
5.有序集合类型
有序集合类型的内部编码方式可能是REDIS_ENCODING_SKIPLIST或REDIS_ENCODING_ZIPLIST。同样在配置文件中可以定义使用REDIS_ENCODING_ZIPLIST方式编码的时机:
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
具体规则和散列类型及列表类型一样,不再赘述。
当编码方式是REDIS_ENCODING_SKIPLIST时,Redis使用散列表和跳跃列表(skiplist)两种数据结构来存储有序集合类型键值,其中散列表用来存储元素值与元素分数的映射关系以实现0(1)时间复杂度的ZSCORE等命令。跳跃列表用来存储元素的分数及其到元素值的映射以实现排序的功能。Redis对跳跃列表的实现进行了几点修改,其中包括允许跳跃列表中的元素(即分数)相同,还有为跳跃链表每个节点增加了指向前一个元素的指针以实现倒序查找。
采用此种编码方式时,元素值是使用redisObject存储的,所以可以使用字符串类型键值的优化方式优化元素值,而元素的分数是使用double类型存储的。
使用REDIS_ENCODING_ZIPLIST编码时有序集合存储的方式按照“元素1的值,元素1的分数,元素2的值,元素2的分数”的顺序排列,并且分数是有序的。
小白把宋老师向自己讲解的知识总结成了一篇帖子发在了学校的网站上,引起了强烈的反响。很多同学希望宋老师能够再写一些关于Redis实践方面的教程,宋老师爽快地答应了。
在此之前我们进行的操作都是通过Redis的命令行客户端redis-cli进行的,并没有介绍实际编程时如何操作Redis。本章将会通过4个实例分别介绍Redis的PHP、Python、Ruby和Node.js客户端的使用方法,即使你不了解其中的某些语言,粗浅的阅读一下也能收获很多实践方面的技巧。
Redis官方推荐的PHP客户端是Predis① 和phpredis② 。前者是完全使用PHP代码实现的原生客户端,而后者则是使用C语言编写的PHP扩展。在功能上两者区别并不大,就性能而言后者会更胜一筹。考虑到很多主机并未提供安装PHP扩展的权限,本节会以Predis为示例介绍如何在PHP中使用Redis。
注释:①见https://github.com/nrk/predis。
注释:②见https://github.com/nicolasff/phpredis。
虽然Predis的性能逊于phpredis,但是除非执行大量Redis命令,否则很难区分二者的性能。而且实际应用中执行Redis命令的开销更多在网络传输上,单纯注重客户端的性能意义不大。读者在开发时可以根据自己的项目需要来权衡使用哪个客户端。
Predis对PHP 版本的最低要求为5.3。
安装Predis可以克隆其版本库(git clone git://github.com/nrk/predis.git),也可以直接从GitHub项目主页中下载代码的ZIP压缩包。如目前最新版v0.8.1的下载地址为https://github.com/nrk/predis/archive/v0.8.1.zip。下载后解压并将整个文件夹复制到项目目录中即可使用。
使用时首先需要引入autoload.php文件:
require './predis/autoload.php';
Predis使用了PHP 5.3中的命名空间特性,并支持PSR-0标准① 。autoload.php文件通过定义PHP的自动加载函数实现了该标准,所以引入了autoload.php文件后就可以自动根据命名空间和类名来自动载入相应的文件了。例如:
注释:①PSR-0标准由PHP Framework Interoperability Group确定,其定义了PHP命名空间与文件路径的对应关系。该标准的网址为https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-0.md。
redis = new Predis\Client();
会自动加载Predis目录下的Client.php文件。如果你的项目使用的PHP框架已经支持了这一标准那么就无需再次引入autoload.php了。
首先创建一个到Redis的连接:
redis=new Predis\Client();
该行代码会默认Redis的地址为127.0.0.1,端口为6379。如果需要更改地址或端口,可以使用:
redis=new Predis\Client(array(
'scheme'=>'tcp',
'host'=>'127.0.0.1',
'port'=>6379,
));
作为开始,我们首先使用GET 命令作为测试:
echo redis->get('foo');
该行代码获得了键名为foo的字符串类型键的值并输出出来,如果不存在则会返回NULL。
当foo键的类型不是字符串类型(如列表类型)时会报异常,可以为该行代码加上异常处理:
try{
echo redis->get('foo');
}catch (Exception e){
echo "Message: {e->getMessage()}";
}
这时输出的内容为:“Message: ERR Operation against a key holding thewrong kind of value”。
调用其他命令的方法和GET命令一样,如要执行LPUSH numbers 1 2 3:
redis->lpush('numbers', '1', '2', '3');
为了使开发更方便,Predis为许多命令额外提供了简便用法,这里选择几个典型的用法依次介绍。
1.MGET/MSET
Predis调用MSET命令时支持将PHP的关联数组直接作为参数,就像这样:
userName=array(
'user:1:name'=>'Tom',
'user:2:name'=>'Jack'
);
//相当于redis->mset('user:1:name','Tom','user:2:name','Jack');
redis->mset(
userName);
同样MGET命令支持一个数组作为参数:
users=array_keys(
userName);
print_r(redis->mget(
users));
打印的结果为:
Array
(
[0]=>Tom
[1]=>Jack
)
2.HMSET/HMGET/HGETALL
Predis调用HMSET的方式和MSET类似,如:
user1=array(
'name'=>'Tom',
'age'=>'32'
);
redis->hmset('user:1',
user1);
HMGET与MGET类似,不再赘述。最方便的是HGETALL命令,Predis会将Redis返回的结果组装成关联数组返回:
user=
redis->hgetall('user:1');
echo user['name'];//'Tom'
3.LPUSH/SADD/ZADD
LPUSH和SADD的调用方式类似:
items=array('a','b');
//相当于redis->lpush('list','a','b');
redis->lpush('list',
items);
//相当于redis->sadd('set','a','b');
redis->sadd('set',
items);
而ZADD的调用方式为:
itemScore=array('Tom'=>'100'
'Jack'=>'89'
);
//相当于redis->zadd('zset', '100', 'Tom', '89', 'Jack');
redis->zadd('zset',
itemScore);
4.SORT
在Predis中调用SORT命令的方式和其他命令不同,必须将SORT命令中除键名外的参数作为关联数组传入到函数中。如对SORT mylist BY weight_* LIMIT 010 GETvalue_* GET # ASC ALPHA STORE result这条命令而言,使用Predis的调用方法如下:
redis->sort('mylist', array(
'by'=>'weight_*',
'limit'=>array(0, 10),
'get'=>array('value_*', '#'),
'sort'=>'asc',
'alpha'=>true,
'store'=>'result'
));
本节将使用PHP和Redis实现用户注册登录功能,下面分模块来介绍具体实现方法。
1.注册
需求描述:用户注册时需要提交邮箱、登录密码和昵称。其中邮箱是用户的唯一标识,每个用户的邮箱不能重复,但允许用户修改自己的邮箱。
我们使用散列类型来存储用户的资料,键名为user:用户ID。其中用户ID是一个自增的数字,之所以使用ID而不是邮箱作为用户的标识是因为考虑到在其他键中可能会通过用户的标识与用户对象相关联,如果使用邮箱作为用户的标识的话在用户修改邮箱时就不得不同时需要修改大量的键名或键值。为了尽可能地减少要修改的地方,我们只把邮箱作为该散列键的一个字段。为此还需要使用一个散列类型的键email.to.id来记录邮箱和用户ID间的对应关系以便在登录时能够通过邮箱获得用户的ID。
用户填写并提交注册表单后首先需要验证用户输入,我们在项目目录中建立一个register.php文件来实现用户注册的逻辑。验证部分的代码如下:
//设置Content-type以使浏览器可以使用正确的编码显示提示信息,
//具体的编码需要根据文件实际编码选择,此处是utf-8。
header("Content-type: text/html; charset=utf-8");
if(!isset(_POST['email']) ||
!isset(_POST['password']) ||
!isset(_POST['nickname'])){
echo '请填写完整的信息。';
exit;
}
email=
_POST['email'];
//验证用户提交的邮箱是否正确
if(!filter_var(email, FILTER_VALIDATE_EMAIL)){
echo'邮箱格式不正确,请重新检查';
exit;
}
rawPassword=
_POST['password'];
//验证用户提交的密码是否安全
if(strlen(rawPassword)< 6){
echo '为了保证安全,密码长度至少为6。';
exit;
}
nickname=
_POST['nickname'];
//不同的网站对用户昵称有不同的要求,这里不再做检查,即使是空也可以。
//而后我们需要判断用户提交的邮箱是否被注册了:
redis=new Predis\Client();
if(redis->hexists('email.to.id',
email)){
echo '该邮箱已经被注册过了。';
exit;
}
验证通过后接下来就需要将用户资料存入Redis中。在存储的时候要记住使用散列函数处理用户提交的密码,避免在数据库中存储明文密码。原因是如果数据库中数据泄露(外部原因或内部原因都有可能),攻击者也无法获得用户的真实密码,也便无法正常地登录进系统。更重要的是考虑到用户很可能在其他网站中也使用了同样的密码,所以明文密码泄露还会给用户造成额外的损失。
除此之外,还要避免使用速度较快的散列函数处理密码以防止攻击者使用穷举法破解密码,并且需要为每个用户生成一个随机的“盐”(salt)以避免攻击者使用彩虹表破解。这里作为示例,我们使用Bcrypt算法来对密码进行散列。PHP 5.3中提供的crypt函数支持Bcrypt算法,我们可以实现一个函数来随机生成盐并调用crypt函数获得散列后的密码:
function bcryptHash(rawPassword,
round=8)
{
if(round < 4 ||
round > 31)
round=8;
salt='
2a
' . str_pad(
round, 2, '0', STR_PAD_LEFT) . '
';
randomValue=openssl_random_pseudo_bytes(16);
salt .=substr(strtr(base64_encode(
randomValue), '+', '.'), 0, 22);
return crypt(rawPassword,
salt);
}
提示 openssl_random_pseudo_bytes函数需要安装OpenSSL扩展。
之后使用如下代码获得散列后的密码:
hashedPassword=bcryptHash(
rawPassword);
存储用户资料就很简单了,所有命令都在第3章介绍过了。代码如下:
require './predis/autoload.php';
redis=new Predis\Client();
//首先获取一个自增的用户ID
userID=
redis->incr('users:count');
//存储用户信息
redis->hmset("user:{
userID}", array(
'email'=>email,
'password'=>hashedPassword,
'nickname'=>nickname
));
//记得记录下邮箱和用户ID的对应关系
redis->hset('email.to.id',
email,
userID);
//提示用户注册成功
echo '注册成功!';
大部分情况下在注册时我们需要验证用户的邮箱,不过这部分的逻辑与忘记密码部分相似,所以在这里不做更多的介绍。
2.登录
需求描述:用户登录时需要提交邮箱和登录密码,如果正确则输出“登录成功”,否则输出“用户名或密码错误”。
当用户提交邮箱和登录密码后首先通过email.to.id键获得用户ID,然后将用户提交的登录密码使用同样的盐进行散列并与数据库存储的密码比对,如果一样则表示登录成功。我们新建一个login.php文件来处理用户的登录,处理该逻辑的部分代码如下:
header("Content-type: text/html; charset=utf-8");
if(!isset(_POST['email']) ||
!isset(_POST['password'])){
echo '请填写完整的信息。';
exit;
}
email=
_POST['email'];
rawPassword=
_POST['password'];
require './predis/autoload.php';
redis=new Predis\Client();
//获得用户的ID
userID=
redis->hget('email.to.id',
email);
if(!userID){
echo '用户名或密码错误。';
exit;
}
hashedPassword =
redis->hget("user:{
userID}", 'password');
现在我们得到了之前存储过的经过散列后的密码,接着定义一个函数来对用户提交的密码进行散列处理。bcryptHash函数中返回的密码中已经包含了盐,所以只需要直接将散列后的密码作为crypt 函数的第二个参数,crypt函数会自动地提取出密码中的盐:
function bcryptVerify(rawPassword,
storedHash)
{
return crypt(rawPassword,
storedHash) ==
storedHash;
}
之后就可以使用此函数进行比对了:
if(!bcryptVerify(rawPassword,
hashedPassword)){
echo'用户名或密码错误。';
exit;
}
echo'登录成功!';
3.忘记密码
需求描述:当用户忘记密码时可以输入自己的邮箱,系统会发送一封包含更改密码的链接的邮件,用户单击该链接后会进入密码修改页面。该模块的访问频率限制为1分钟1O次以防止恶意用户通过此模块向某个邮箱地址大量发送垃圾邮件。
当用户在忘记密码的页面输入邮箱后,我们的程序需要做两件事。
(1)进行访问频率限制。这里使用4.2.3节介绍的方法以邮箱为标示符对发送修改密码邮件的过程进行访问频率限制。当用户提交了邮箱地址后首先验证邮箱地址是否正确,如果正确则检查访问频率是否超限:
keyName="rate.limiting:{
email}";
now=time();
if(redis->llen(
keyName)<10){
redis->lpush(
keyName,
now);
}else{
time =
redis->lindex(
keyName, -1);
if(now -
time<60){
echo '访问频率超过了限制,请稍后再试。';
exit;
}else{
redis->lpush(
keyName,
now);
redis->ltrim(
keyName, 0, 9);
}
}
一般在全站中还会有针对IP地址的访问频率限制,原理与此类似。
(2)发送修改密码邮件。用户通过访问频率限制后我们会为其生成一个随机的验证码,并将验证码通过邮件发送给用户。同时在程序中要把用户的邮箱地址存入名为retrieve.password.code:散列后的验证码的字符串类型键中,然后使用EXPIRE命令为其设置一个生存时间(如1个小时)以提供安全性并且保证及时释放存储空间。由于忘记密码需要的安全等级与用户注册登录相同,所以我们依然使用Bcrypt算法来对验证码进行散列,具体的算法同上这里不再详述。
Redis官方推荐的Ruby客户端是redis-rb① ,也是各种语言的Redis客户端中最为稳定的一个。其主要代码贡献者就是Redis的开发者之一Pieter Noordhuis。
注释:①见https://github.com/redis/redis-rb。
使用gem install redis安装最新版本的redis-rb,目前的最新版本是3.0.2。
创建到Redis的连接很简单:
require 'redis'
redis=Redis.new
该行代码会默认Redis的地址为127.0.0.1,端口为6379。如果需要更改地址或端口,可以使:
redis=Redis.new(:host=>'127.0.0.1', :port => 6379)
redis-rb的官方文档相对比较详细,所以具体的使用方法可以见其GitHub主页。这里从其中挑出几个比较有代表性的命令作为示例:
r.set('redis_db', 'great k / v storage') #=>OK
r.get('redis_db') #=>"great k / v storage"
r.incrby('counter', 99) #=>99
r.hmset('hash_dt', :key2, 'value2', :key3, 'value3') #=>OK
redis-rb最便捷的命令调用方法就是对SET和GET命令使用别名[],例如:
redis.set('key', 'value')
可以写成
redis['key']='value'
同样
value=redis.get('key')
可以写成
value=redis['key']
另外,对于事务的返回值可以提前设置对结果的引用,就像这样:
redis.multi do
redis.set('key', 'hi')
@value=redis.get('key')
redis.set('key', '2')
@number=redis.incr('key')
end
p @value.value #输出"hi"
p @number.value #输出3
现在很多网站都有标签功能,用户可以给某个项目(如文章、图书等)添加标签,也可以通过标签查询项目。在很多时候,我们都希望在用户输入标签时网站可以自动帮助用户补全要输入的标签,如图5-1所示。
图5-1 输入“tech”后网站会列出以“tech”开头的标签
这样做一是可以节约用户的输入时间,二是在创建标签时可以起到规范标签的作用,避免用户输入标签时可能出现的拼写错误。
下面介绍两种在Redis中实现补全提示的方法,并会挑选一种用Ruby来实现。
第一种方法:为每个标签的每个前缀都使用一个集合类型键来存储该前缀对应的标签名。如“ruby”的所有前缀分别是“r”、“ru”和“rub”,我们为这3个前缀对应的集合类型键都加入元素“ruby”。
当有“ruby”和“redis”两个标签时Redis中存储的内容如图5-2所示,用户输入“r”时就可以通过读取键“prefix:r”来获知以“r”开头的标签有“ruby”和“redis”两个。
图5-2 “ruby”和“redis”两个标签的索引存储结构
这时就可以将这两个标签提示给用户了。更进一步,我们还可以存储每个标签的访问量,使得我们可以利用SORT命令配合BY参数把最热门的标签排在前面。
第二种方法通过有序集合实现,该方法是由Redis的作者Salvatore Sanfilippo介绍的。
3.6节介绍过有序集合类型有一个特性是当元素的分数一样时会按照元素值的字典顺序排序,利用这一特性只使用一个有序集合类型键就能实现标签的补全功能,准备过程如下。
(1)首先把每个标签名的所有前缀作为元素存入键中,分数均为0;
(2)将每个标签名后面都加上“*”符号并存入键中,分数也为0。准备过后的存储情况如图5-3所示。
图5-3 “ruby”和“redis”两个标签的索引存储结构
由于所有元素的分数都相同,所以该有序集合键中的项目相当于全部按照字典顺序排序(即图5-3所示的顺序)。这样当用户输入“r”时就可以按照如下流程获取要提示给用户的标签:
获取“r”的排名:ZRANK autocomplete r,在这里的返回值是0;
获取“r”之后的N个元素,如当N=100时:ZRANGE autocomplete 1101。 N的取值与标签的平均长度和需要获得的标签数量有关,可以根据实际情况自由调整;
遍历返回的结果,找出其中以"*"结尾的且以“r”开头的元素。此时将“*”去掉后就是我们需要的结果了。
下面我们写一个小程序来作为示例,程序启动时会从一个文本文件中读取所有标签列表,然后接收用户输入并返回相应的补全结果。
文本文件的样例内容如下:
我的中国心
我的中国话
你好吗
我和你
你一路走来
你从哪里来
当用户输入“我的”时程序会打印如下内容:
我的中国心
我的中国话
具体的实现方法是,首先我们定义一个函数来获得标签的前缀(包括标签加上星号):
#获得标签的所有前缀
#
# @example
# get_prefixes('word')
# #=>['w', 'wo', 'wor', 'word*']
def get_prefixes(word)
Array.new(word.length) do |i|
if i == word.length - 1
"#{word}*"
else
word[0..i]
end
end
end
接着我们加载redis-rb,并建立到Redis的连接:
require 'redis'
#建立到默认地址和端口的Redis 的连接
redis=Redis.new
为了保证可以重复运行此程序,我们需要删除之前建立的键以免影响本次的结果:
redis.del('autocomplete')
下面是准备阶段,程序从words.txt文件读取标签列表,并获得每个标签的前缀加入到有序集合键中:
argv=[]
File.open('words.txt').each_line do |word|
get_prefixes(word.chomp).each do |prefix|
argv << [0, prefix]
end
end
redis.zadd('autocomplete', argv)
redis-rb的zadd函数支持两种方式的参数:当只加入一个元素时使用redis.zadd(key, score, member),当同时加入多个元素时使用redis.zadd(key, [[score1,member1], [score2, member2], …])上面的代码使用的是后一种方式。
最后一步我们通过循环来接收用户的输入并查询对应的标签:
while prefix=gets.chomp do
result=[]
if(rank = redis.zrank('autocomplete', prefix))
#存在以用户输入的内容为前缀的标签
redis.zrange('autocomplete', rank + 1, rank + 100).each do |words|
#获得该前缀后的100 个元素
if words[-1] == '*' && prefix == words[0..prefix.length - 1]
#如果以"*"结尾并以用户输入的内容为前缀则加入结果中
result << words[0..-2]
end
end
end
#打印结果
puts result
end
Redis官方推荐的Python客户端是redis-py① 。
注释:①见https://github.com/andymccurdy/redis-py。
推荐使用pip install redis安装最新版本的redis-py,也可以使用easy_install:easy_ install redis。
首先需要引入redis-py:
import redis
下面的代码将创建一个默认连接到地址127.0.0.1,端口6379的Redis连接:
r=redis.StrictRedis()
也可以显式地指定需要连接的地址:
r=redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
使用起来很容易,这里以SET和GET命令作为示例:
r.set('foo', 'bar') # True
r.get('foo') # 'bar'
1.HMSET/HGETALL
HMSET支持将字典作为参数存储,同时HGETALL的返回值也是一个字典,搭配使用十分方便:
r.hmset('dict', {'name': 'Bob'})
people=r.hgetall('dict')
print people # {'name': 'Bob'}
2.事务和管道
redis-py的事务使用方式如下:
pipe=r.pipeline()
pipe.set('foo', 'bar')
pipe.get('foo')
result=pipe.execute()
print result # [True, 'bar']
管道的使用方式和事务相同,只不过需要在创建时加上参数transaction=False:
pipe=r.pipeline(transaction=False)
事务和管道还支持链式调用:
result=r.pipeline().set('foo', 'bar').get('foo').execute()
# [True, 'bar']
一般的社交网站上都可以看到用户在线的好友列表,如图5-4所示。在Redis中可以很容易地实现这个功能。
图5-4 某网站上用户的在线好友列表
在线好友其实就是全站在线用户的集合和某个用户所有好友的集合取交集的结果。如果现在我们的网站就是使用集合类型键来存储用户的好友ID的,那么只需要一个存储在线用户列表的集合即可。如何判定一个用户是否在线呢?通常的方法是每当用户发送HTTP请求时都记录下请求发生的时间,所有指定时间内发送过请求的用户就算作在线用户。这段时间根据场景不同取值也不同,以10分钟为例:某个用户发送了一个HTTP请求,9分钟后系统仍然认为他是在线的,但到了第11分钟就不算作他在线了。
在Redis中我们可以每隔10分钟就使用一个键来存储该10分钟内发送过请求的用户ID列表。如12点20分到12点29分的用户ID存储在active.users:2 中,12点30分到12点39分的用户ID存储在active.users:3中,以此类推(注意每次调用SADD命令增加用户ID时需要同时设置键的生存时间在50分钟内以防止命名沖突)。这样需要获得当前在线用户只需要读取当前分钟数对应的键即可。不过这种方案会造成较大的误差,比如某个用户在29分访问了一个页面,他的ID被记录在active.users:2键中,而在30分时系统会读取active.users:3键来获取在线用户列表,即该用户的在线状态只持续了1分钟而不是预想的10分钟。
这时就需要粒度更小的记录方案来解决这个问题。我们可以将原先每10分钟记录一个键改为每1分钟记录一个键,即在12点29分访问的用户的ID将会被记录在active.users:29中。而判断一个用户是否在最近10分钟在线只需要判断其在最近的10个集合键中是否出现过至少一次即可,这一过程可以通过SUNION命令实现。
下面介绍使用Python来实现这一过程。我们这里使用了web.py框架,web.py是一个易于使用的Python网站开发框架,可以通过sudo pip install web.py来安装它。
代码如下:
#-*- coding: utf-8 -*-
import web
import time
import redis
r=redis.StrictRedis()"
"" 配置路由规则
'/': 模拟用户的访问
'/online':查看在线用户
"""
urls=(
'/', 'visit',
'/online', 'online'
)
app=web.application(urls, globals())
"""返回当前时间对应的键名
如28分对应的键名是active.users:28
"""
def time_to_key(current_time):
return 'active.users:' + time.strftime('%M', time.localtime(current_time))
""" 返回最近10分钟的键名
结果是列表类型
"""
def keys_in_last_10_minutes():
now=time.time()
result=[]
for i in range(10):
result.append(time_to_key(now - i * 60))
return result
class visit:
""" 模拟用户访问
将用户的User agent作为用户的ID加入到当前时间对应的键中
"""
def GET(self):
user_id=web.ctx.env['HTTP_USER_AGENT']
current_key=time_to_key(time.time())
pipe = r.pipeline()
pipe.sadd(current_key, user_id)
#设置键的生存时间为10分钟
pipe.expire(current_key, 10 * 60)
pipe.execute()
return 'User:\t' + user_id + '\r\nKey:\t' + current_key
class online:
""" 查看当前在线的用户列表
"""
def GET(self):
online_users=r.sunion(keys_in_last_10_minutes())
result=''
for user in online_users:
result += 'User agent:' + user + '\r\n'
return result
if __name__ == "__main__":
app.run()
在代码中我们建立了两个页面。首先我们打开http://127.0.0.1:8080,该页面对应visit类,每次访问该页面都会将用户的浏览器User agent存储在记录当前分钟在线用户的键中,并将User agent和键名显示出来,如图5-5所示。
图5-5 使用Safari访问http://127.0.0.1:8080
从键名可知该次访问是在某时26分钟的时候发生的。然后使用另一个浏览器打开该页面,如图5-6所示。
图5-6 使用Firefox访问http://127.0.0.1:8080
该次访问发生在29分钟。最后我们在37分钟时访问http://127.0.0.1:8080/online来查看当前在线用户列表,如图5-7所示。
图5-7 查看在线用户结果
结果与预期一样,在线列表中只有在29分钟访问的用户。
另一种方法:有序集合
有时网站本来就要记录全站用户的最后访问时间(如图5-8所示),这时就可以直接利用此数据获得最后一次访问发生在10分钟内的用户列表(即在线用户)。
图5-8 Stack Overflow 网站的个人资料页面记录了用户上次访问的时间
我们使用一个有序集合来记录用户的最后访问时间,元素值为用户的ID,分数为最后一次访问的UNIX时间。要获得最近10分钟访问过的用户列表可以使用ZRANGEBYSCORE命令:
ten_minutes_ago=time.time() - 10 * 60
online_users=r.zrangebyscore('last.seen', ten_minutes_ago, '+inf')
那么如何获取在线的好友列表呢(与上一个例子一样,此时依然使用集合类型存储用户的好友列表)?最直接的方法就是将上面存储在线用户列表的online_users变量存入Redis的一个集合类型的键中然后和用户的好友列表取交集。然而这种方法需要在服务端和客户端之间传输数据,如果在线用户多的话会有较大的网络开销,而且这种方法也不能通过Redis的事务功能实现原子操作。为了解决这些问题,我们希望实现一个方法将ZRANGEBYSCORE命令的结果直接存入一个新键中而不返回到客户端。思路如下:有序集合只有ZINTERSTORE和ZUNIONSTORE两个命令支持直接将运算结果存入键中,然而这两个命令都不能实现我们要的操作。所以只能换种思路:既然没办法直接把有序集合中某一分数段的元素存入新键中,那何不干脆复制一个新建,并使用ZREMRANGEBYSCORE命令将我们不需要的分数段的元素删除?
有了这一思路后下面的实现方法就很简单了,步骤如下。
(1)复制一个last.seen键的副本temp.last.seen,方法为ZUNIONSTORE temp.last.seen 1 last.seen。在这里我们巧妙地借助了ZUNIONSTORE命令实现了对有序集合类型键的复制过程,即参加求并集操作的元素只有一个,结果自然就是它本身。
(2)将不在线的用户(即1O分钟以前的用户)删除。方法为ZREMRANGEBYSCOREtemp.last.seen 0 10分钟前的UNIX时间。
(3)现在temp.last.seen键中存储的就是当前的在线用户了。我们将其和用户的好友列表做交集:ZINTERSTORE online.friends 2 temp.last.seen user:42:friends。这里我们以ID为42的用户举例,user:42:friends是存储其好友的集合类型键① 。
注释:①ZINTERSTORE命令的参数除了有序集合类型外还可以是集合类型,此时的集合类型会被作为分数为1的有序集合类型处理。
(4)使用ZRANGE命令获取online.friends键的值。
(5)收尾工作,删除temp.last.seen和online.friends键。因为temp.last.seen键可以被所有用户共用,所以可以根据情况将其缓存一段时间,在下次需要生成时先判断是否有该键,如果有则直接使用。
以上5步需要使用事务或脚本实现以保证每个步骤的原子性。
有的时候我们会使用有序集合键来存储用户的好友列表以记录成为好友的时间,此时第3步依然奏效。
虽然以上的步骤有些复杂,但是实现起来并不难,有兴趣的读者可以自己完成。
Redis官方推荐的Node.js客户端是node_redis① 。
注释:①见https://github.com/mranney/node_redis。
使用npm install redis命令安装最新版本的node_redis,目前版本是0.8.2。
首先加载node_redis模块:
var redis = require('redis');
下面的代码将创建一个默认连接到地址127.0.0.1,端口6379的Redis连接:
var client = redis.createClient();
也可以显式地指定需要连接的地址:
var client = redis.createClient('6379', '127.0.0.1')
由于Node.js的异步特性,在处理返回值的时候与其他客户端差别较大。还是以GET/SET命令为例:
client.set('foo', 'bar', function () {
//此时SET命令执行完并返回结果,
//因为这里并不关心SET命令的结果,所以我们省略了回调函数的形参。
client.get('foo', function (error, fooValue) {
//error 参数存储了命令执行时返回的错误信息,如果没有错误则返回null。
//回调函数的第二个参数存储的是命令执行的结果
console.log(fooValue); // 'bar'
});
});
使用node_redis执行命令时需要传入回调函数(callback function)来获得返回值,当命令执行完返回结果后node_redis会调用该函数,并将命令的错误信息作为第一个参数、返回值作为第二个参数传递给该函数。关于Node.js的异步模型的介绍超出了本书的范围,有兴趣的读者可以访问Node.js的官网① 了解更多信息。
注释:①见http://nodejs.org。
Node.js的异步模型使得通过node_redis调用Redis命令的表现与Redis的底层管道协议十分相似:调用命令函数时(如client.set())并不会等待Redis返回命令执行结果,而是直接继续执行下一条语句,所以在Node.js中通过异步模型就能实现与管道类似的效果(也因此 node_redis没有提供管道相关的命令)。上面的例子中我们并不需要SET命令的返回值,只要保证SET命令在GET命令前发出即可,所以完全不用等待SET命令返回结果后再执行GET命令。因此上面的代码可以改写成:
//不需要返回值时可以省略回调函数
client.set('foo', 'bar');
client.get('foo', function (error, fooValue) {
console.log(fooValue); // 'bar'
});
不过由于SET和GET并未真正使用Redis的管道协议发送,所以当有多个客户端同时向Redis发送命令时,上例中的两个命令之间可能会被插入其他命令,换句话说,GET命令得到的值未必是“bar”。
虽然Node.js的异步特性给我们带来了相对更高的性能,然而另一方面使用Redis实现某个功能时我们经常需要读写若干个键,而且很多情况下都会依赖之前命令的返回结果。这时就会出现嵌套多重回调函数的情况,影响代码可读性。就像这样:
client.get('people:2:home', function (error, home){
client.hget('locations', home, function (error, address) {
client.exists('address:' + address, function (errror, addressExists) {
if (addressExists) {
console.log('地址存在。');
} else {
client.exists('backup.address:' + address, function (error,
backupAddress Exists) {
if (backupAddressExists) {
console.log('备用地址存在。');
} else {
console.log('地址不存在。');
}
});
}
});
})
});
上面的代码并不是极端的情况,相反在实际开发中经常会遇到这种多层嵌套。为了减少嵌套,可以考虑使用Async② 、Step③ 等第三方模块。如上面的代码可以稍微修改后使用Async重写为:
注释:②见https://github.com/caolan/async。
注释:③见https://github.com/creationix/step。
async.waterfall([
function (callback) {
client.get('people:2:home', callback);
},
function (home, callback) {
client.hget('locations', home, callback);
},
function (address, callback) {
async.parallel([
function (callback) {
client.exists('address:' + address, callback);
},
function (callback) {
client.exists('backup.address:' + address, callback);
},
], function (err, results) {
if (results[0]) {
console.log('地址存在。');
}else if (results[1]) {
console.log('备用地址存在。');
} else {
console.log('地址不存在。');
}
});
}
]);
1.HMSET/HGETALL
node_redis同样支持在HMSET命令中使用对象作参数(对象的属性值只能是字符串),相应的HGETALL命令会返回一个对象。
2.事务
事务的用法如下:
var multi=client.multi();
multi.set('foo', 'bar');
multi.sadd('set', 'a');
mulit.exec(function (err, replies) {
//replies 是一个数组,依次存放事务队列中命令的结果
console.log(replies);
});
或者使用链式调用:
client.multi()
.set('foo', 'bar')
.sadd('set', 'a')
.exec(function (err, replies) {
console.log(replies);
});
3.“发布/订阅”模式
Node.js使用事件的方式实现“发布/订阅”模式。现在创建两个连接分别充当发布者和订阅者:
var pub=redis.createClient();
var sub=redis.createClient();
然后让sub订阅chat频道:
sub.subscribe('chat');
定义当接收到消息时要执行的回调函数:
sub.on('message', function (channel, message) {
console.log('收到' + channel + '频道的消息:' + message);
});
在sub订阅成功后,我们让pub向chat频道发送一个问候信息:
sub.on('subscribe', function (channel, count) {
pub.publish('chat', 'hi!');
})
运行后可以看到打印的结果:
node testpubsub.js
收到chat 频道的消息:'hi!'
补充知识 在node_redis中建立连接的过程同样是异步的,即执行client=redis. createClient()后并未立即建立连接。在连接建立完成前执行的命令会被加入到离线任务队列中,当连接建立成功后node_redis会按照加入的顺序依次执行离线任务队列中的命令。
很多场合下网站都需要根据访客的IP地址判断访客所在地。假设我们有一个地名和IP地址段的对应表① :
注释:①该表只用于演示用途,其中的数据并不准确。
上海: 202.127.0.0~202.127.4.255
北京: 122.200.64.0~122.207.255.255
如果用户的IP地址为122.202.2.0,我们就能根据这个表知道他的地址位于北京。Redis可以使用一个有序集合类型的键来存储这个表。
首先将表中的IP 地址转换成十进制数字:
上海: 3397320704~3397321983
北京: 2059943936~2060451839
然后使用有序集合类型记录这个表。方式为每个地点存储两条数据:一条的元素值是地点名,分数是该地点对应的最大IP地址。另一条是“*”加上地点名,分数是该地点对应的最小IP地址,如图5-9所示。
图5-9 使用有序集合键存储地点和相应IP范围的存储结构
在查找某个IP地址属于哪个地点时先将该IP地址转换成十进制数字,然后在有序集合中找到大于该数字的最小的一个元素,如果该元素不是以“*”开头则表示找到了,如果是则表示数据库中并未记录该IP地址对应的地名。
如我们想找到“122.202.2.0”的所在地,首先将其转换成数字“2060059136”,然后在有序集合中找到第一个大于它的分数为“2060451839”,对应的元素值为“北京”,不是以“*”开头,所以该地址的所在地是北京。
下面介绍使用Node.js实现这一过程。首先将表转换成CSV格式并存为ip.csv:
上海,202.127.0.0,202.127.4.255
北京,122.200.64.0,122.207.255.255
而后使用node-csv-parser模块① 加载该csv文件:
注释:①见https://github.com/wdavidw/node-csv-parser。安装方法为npm install csv。
var fs=require('fs');
var csv=require('csv');
csv().from.stream(fs.createReadStream('ip.csv'))
.on('record', importIP);
读取每行数据时node-csv-parser模块都会调用importIP回调函数。该函数实现如下:
var redis=require('redis');
var client=redis.createClient();
//将IP地址数据加入Redis
//输入格式:"['上海', '202.127.0.0', '202.127.4.255']"
function importIP (data) {
var location = data[0];var minIP=convertIPtoNumber(data[1]);
var maxIP=convertIPtoNumber(data[2]);
//将数据加入到有序集合中,键名为'ip'
client.zadd('ip', minIP, '*' + location, maxIP, location);
}
其中convertIPtoNumber函数用来将IP地址转换成十进制数字,
//将IP地址转换成十进制数字
//convertIPtoNumber('127.0.0.1') => 2130706433
function convertIPtoNumber(ip) {
var result='';
ip.split('.').forEach(function (item) {
item=~~item;
item=item.toString(2);
item=pad(item, 8);
result += item;
});
return parseInt(result, 2);
}
pad函数用于将二进制数补全为8位:
//在字符串前补'0'。
//pad('11', 3)=>'011'
function pad(num, n) {
var len=num.length;
while(len<n) {
num='0'+num;
len++;
}
return num;
}
至此数据准备工作完成了,现在我们提供一个接口来供用户查询:
var readline=require('readline');
var rl=readline.createInterface({
input: process.stdin,
output: process.stdout
});
rl.setPrompt('IP> ');
rl.prompt();
rl.on('line', function (line) {
ip=convertIPtoNumber(line);
client.zrangebyscore('ip', ip, '+inf', 'LIMIT', '0', '1', function (err,result) {
if (!Array.isArray(result) || result.length === 0) {
//该IP地址超出了数据库记录的最大IP地址
console.log('No data.');
} else {
var location=result[0];
if (location[0]==='*') {
//该IP地址不属于任何一个IP地址段
console.log('No data.');
} else {
console.log(location);
}
}
rl.prompt();
});
});
运行后的结果如下:
node ip_search.js
IP>127.0.0.1
No data.
IP>122.202.23.34
北京
IP>202.127.3.3
上海
上面的代码的实际查找范围是一个半开半闭区间。如果想实现闭区间查找,读者可以在比对“*”时同时比较元素的分数和查找的IP地址是否相同。