Redis享知享学:单机数据库实现:blocking_keys和ready_keys属性

本篇研究blocking_keys和ready_keys属性。

blocking_keys的作用是什么?Redis源码中为什么拥有两个ready_keys同名属性?解除客户端阻塞时采用了什么策略?这篇文章将对此进行探究。

blocking_keys

在对Redis列表的操作上,有三个命令会造成客户端的阻塞,分别是BLPOP、BRPOP以及BRPOPLPUSH,数据库需要对这些功能的实现提供支持。

Redis数据库提供了blocking_keys属性,它仍然是一个字典,键是造成客户端阻塞的键,值是一个链表形式的list,用什么方法来表示客户端呢?看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
* 根据给定数量的 key ,对给定客户端进行阻塞
*
* 参数:
* keys 多个 key
* numkeys key 的数量
* timeout 阻塞的最长时限
* target 在解除阻塞时,将结果保存到这个 key 对象,而不是返回给客户端
* 只用于 BRPOPLPUSH 命令
*
* T = O(N)
*/
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
// 设置阻塞状态的超时和目标选项
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
// 将所有 key 加入到 client.bpop.keys 字典里,O(N)
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dict ignore it. */
// 记录阻塞 key 到客户端, O(1)
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
// 将被阻塞的客户端添加到 db->blocking_keys 字典的链表中
// O(1)
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
// 这个 key 第一次被阻塞,创建一个链表
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
// 已经有其他客户端被这个 key 阻塞
l = dictGetVal(de);
}
// 加入链表
listAddNodeTail(l,c);
}
/* Mark the client as a blocked client */
// 将客户端的状态设置为阻塞
c->flags |= REDIS_BLOCKED;
// 为服务器的阻塞客户端数量增一
server.bpop_blocked_clients++;
}

Redis声明了一个结构体redisClient,用它来代表客户端。blocking_keys的主要作用就是在出现阻塞情形时,添加(key, 被该键阻塞的客户端列表)这样的键值对,被相同key阻塞的客户端,会按链表的形式维护在该key对应的值里,该值即为被该键阻塞的客户端列表。

可以看出,客户端阻塞就是简单设置标志位flags(后续Redis版本会将最后几个方法一起封装成一个方法blockClient),毕竟Redis是单线程,不可能还会将当前线程挂起。

blockForKeys方法会在两个地方调用,一个是blockingPopGenericCommand(),它是BLPOP/BRPOP 的底层实现;一个是brpoplpushCommand(),是BRPOPLPUSH的实现。

ready_keys

LPUSH、RPUSH以及LINSERT命令可以停止客户端的阻塞。停止阻塞的功能需要借助ready_keys属性实现。

首先必须清楚的是,服务器和数据库具有相同名称的属性ready_keys,后续将对它们进行解析。

解除阻塞的三种方式:

  • 被动脱离:有其它客户端为阻塞的键推入新元素;
  • 主动脱离:到达设定的阻塞时间;
  • 强制脱离:强制中断连接,服务器停机;

现在看看这三种的第一种:

前面提到的LPUSH、RPUSH以及LINSERT命令底层都是pushGenericCommand函数实现的。

解除客户端阻塞之准备

现在假设为某个key推入了一个值,pushGenericCommand函数首先在数据库查找这个key对应的value对象,并判断是否为空:

1
2
3
4
5
// 查找key指向的对象,O(1)
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
// 如果列表为空,那么可能正在有客户端等待这个列表
int may_have_waiting_clients = (lobj == NULL);

然后还要判断该value对象的类型是不是list:

1
2
3
4
if (lobj && lobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}

如果连列表都不是那就没有继续下去的意义。

准备工作做足之后,如果列表为空,很容易想到应该判断该key是否在blocking_keys,实际上,Redis也是这么做的:

1
if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);

判断key是否在blocking_keys的方法就在这个signalListAsReady函数内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void signalListAsReady(redisClient *c, robj *key) {
readyList *rl;
/* No clients blocking for this key? No need to queue it. */
// 没有客户端在等待这个 key ,直接返回
// O(1)
if (dictFind(c->db->blocking_keys,key) == NULL) return;
/* Key was already signaled? No need to queue it again. */
// key 已经位于就绪列表,直接返回
// O(1)
if (dictFind(c->db->ready_keys,key) != NULL) return;
/* Ok, we need to queue this key into server.ready_keys. */
// 添加包含 key 及其 db 信息的 readyList 结构到服务器端的就绪列表
// O(1)
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = c->db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
// 同时将 key 添加到 db 的 ready_keys 字典中
// 提供 O(1) 复杂度来查询某个 key 是否已经就绪
incrRefCount(key);
redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
}

如果不在blocking_keys字典里,就直接返回;如果db.ready_keys不为空,也直接返回,这个属性是什么后面再介绍;如果程序都不满足之前两个“如果”的条件,则继续向下执行,此时会创建一个readyList,它会作为表元素的value,然后以链表的方式将表元素连接到server.ready_keys列表的尾部,readyList结构长这样:

1
2
3
4
typedef struct readyList {
redisDb *db;
robj *key;
} readyList;

该结构体内db指向数据库,key指向数据库键空间的key。

在signalListAsReady方法的最后,redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);执行的同时,会尝试将key添加到db的ready_keys字典中。该字典添加的键值对为(key, NULL),显然不是为了存储,而是为了实现列表快速查询功能,当需要知道一个键代表的list是否已经存在时,可以借助dictAdd函数,而其内部会调用dictAddRaw方法,该方法内部又执行如下语句:

1
2
3
4
// 查找可容纳新元素的索引位置
// 如果元素已存在, index 为 -1
if ((index = _dictKeyIndex(d, key)) == -1)
return NULL;

db->ready_keys保证了不会有重复的key所指向的list添加在自身内部。而且前面也提到在signalListAsReady方法一开始会进行判断:如果db.ready_keys不为空,直接返回,所以db->ready_keys起到了一个快速查询防重的作用。

而server.ready_keys是一个链表结构,也可以把它看成一个“多任务队列”,在Java的并发库设计中,也常常使用这种方式。

pushGenericCommand()的主要作用:

  • 如果may_have_waiting_clients为空,执行signalListAsReady方法,创建readyList实例并作为表元素的value,将表元素连接在server.ready_keys最后;
  • 先将列表的元素推入key所指向的列表;

解除客户端阻塞之完成

解除客户端阻塞发生在下一步调用handleClientsBlockedOnLists函数时。解除阻塞的流程大致可以描述为:

  • 将server.ready_keys指向的地址赋给方法内的局部变量指针l,而server.ready_keys设置为一个新创建的列表;
  • 遍历l指向的list,每循环一次,弹出链表实现的list的列表头,并取出value的值readyList实例rl;
  • 通过rl,删除db->ready_keys中的key,即dictDelete(rl->db->ready_keys,rl->key);
  • 通过rl,获得key代表的list实例,即robj *o = lookupKeyWrite(rl->db,rl->key);
  • 在rl->db->blocking_keys查找键为rl->key的dictEntry实例de,通过它获取值,即为包含被阻塞客户端的链表clients;
  • 循环遍历链表clients,lients弹出一个表头客户端,在key代表的列表不为空的情况下,弹出一个元素(代码中用value变量指代),如果value不为空,取消表头客户端阻塞状态,把value作为造成该客户端阻塞的key值;
  • 如果value为空,则说明仍然后客户端被阻塞,break,跳出循环;

由此也可看出,在解除客户端阻塞的时候,实行的是先阻塞先服务的策略(FBFS),因为每次都是推出链表的表头。

参考

Redis 设计与实现:国内解析Redis的开源资料;