Stream:append-only 日志与消费组
用 Stream 做事件日志、轻量级 Kafka 替代和带确认机制的工作队列
Stream 是一个只追加(append-only)的日志结构,每条记录有唯一递增 ID。相比 List 和 Pub/Sub,它多了三件大杀器:消息持久化、按 ID 回溯历史、以及带「确认」(ack)的消费组——足以当作轻量级的 Kafka 用。
写入与读取
127.0.0.1:6379> XADD events * type login user u1
"1734000000000-0"
127.0.0.1:6379> XADD events * type click user u1
"1734000000001-0"
127.0.0.1:6379> XLEN events
(integer) 2
127.0.0.1:6379> XRANGE events - +
1) 1) "1734000000000-0"
2) 1) "type"
2) "login"
3) "user"
4) "u1"
2) 1) "1734000000001-0"
2) ...*让 Valkey 自动生成 ID(毫秒时间戳-序号),保证全局递增。XRANGE events - +中-和+代表最小和最大 ID,即「全部」。
| 命令 | 作用 |
|---|---|
XADD k * f v... | 追加一条记录 |
XLEN k | 记录条数 |
XRANGE k - + | 正序按 ID 区间读 |
XREVRANGE k + - | 倒序读 |
XREAD COUNT n STREAMS k id | 从某 ID 之后读取 |
XINFO STREAM k | 查看流的元信息 |
Valkey 9.1 优化了 XRANGE 的实现,区间读取相比 9.0 大约快 30%,回溯大段历史更划算。
实时跟读:XREAD
XREAD 配合特殊 ID $(表示「只要此刻之后的新消息」)+ BLOCK 可阻塞等待新数据,做实时订阅:
127.0.0.1:6379> XREAD BLOCK 0 STREAMS events $
(阻塞,直到有新消息 XADD 进来才返回)消费组:可扩展 + 不丢消息
这是 Stream 最强的部分。消费组让多个消费者分摊同一个流的消息(每条只被组内一个消费者处理),并且每条消息要被显式 XACK 确认,否则一直停留在「待处理」(pending)列表里,可被重新投递。
# 1. 建组,从头开始消费(0 表示从最早,$ 表示只看新消息)
127.0.0.1:6379> XGROUP CREATE events g1 0
OK
# 2. 消费者 c1 领一条消息(> 表示「还没分给任何人的新消息」)
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 1 STREAMS events >
1) 1) "events"
2) 1) 1) "1734000000000-0"
2) 1) "type"
2) "login"
# 3. 处理完,确认
127.0.0.1:6379> XACK events g1 1734000000000-0
(integer) 1| 命令 | 作用 |
|---|---|
XGROUP CREATE k g id | 创建消费组 |
XREADGROUP GROUP g c ... STREAMS k > | 组内消费者领取新消息 |
XACK k g id | 确认已处理 |
XPENDING k g | 查看未确认的消息 |
XCLAIM k g c minIdle id | 把卡住的消息转给别的消费者 |
XAUTOCLAIM k g c minIdle 0 | 批量自动认领超时消息 |
故障恢复:认领卡住的消息
某消费者拿了消息却崩溃,消息会卡在 pending 里。用 XPENDING 查谁卡住了,再用 XAUTOCLAIM 把「闲置超过一定时间」的消息批量转给健康的消费者:
127.0.0.1:6379> XPENDING events g1
1) (integer) 1
2) "1734000000000-0"
3) "1734000000000-0"
4) 1) 1) "c1"
2) "1"
127.0.0.1:6379> XAUTOCLAIM events g1 c2 30000 030000 表示认领闲置超过 30 秒的消息,交给消费者 c2,实现自动故障转移。
控制内存:XTRIM
只追加的流会无限增长,必须裁剪。两种策略:
# 只保留最近约 1000 条(~ 是近似裁剪,更快)
127.0.0.1:6379> XTRIM events MAXLEN ~ 1000
(integer) 0
# 删除某 ID 之前的所有消息
127.0.0.1:6379> XTRIM events MINID 1734000000000
(integer) 0也可以在 XADD 时直接带 MAXLEN ~ 1000,写入即裁剪。
什么时候用 Stream? 需要持久化、历史回溯、多消费者分摊、消费确认时用 Stream。只要「即发即弃」的实时广播,用更轻的 Pub/Sub。
下一篇
Stream 是持久化的消息日志。下面看它的「即时广播」表亲——Pub/Sub。
继续阅读 → Pub/Sub:实时广播与分片订阅