Comunidade Valkey

Stream:append-only 日志与消费组

用 Stream 做事件日志、轻量级 Kafka 替代和带确认机制的工作队列

Stream 是一个只追加(append-only)的日志结构,每条记录有唯一递增 ID。相比 List 和 Pub/Sub,它多了三件大杀器:消息持久化、按 ID 回溯历史、以及带「确认」(ack)的消费组——足以当作轻量级的 Kafka 用。

写入与读取

127.0.0.1:6379> XADD events * type login user u1
"1734000000000-0"
127.0.0.1:6379> XADD events * type click user u1
"1734000000001-0"
127.0.0.1:6379> XLEN events
(integer) 2
127.0.0.1:6379> XRANGE events - +
1) 1) "1734000000000-0"
   2) 1) "type"
      2) "login"
      3) "user"
      4) "u1"
2) 1) "1734000000001-0"
   2) ...
  • * 让 Valkey 自动生成 ID(毫秒时间戳-序号),保证全局递增。
  • XRANGE events - +-+ 代表最小和最大 ID,即「全部」。
命令作用
XADD k * f v...追加一条记录
XLEN k记录条数
XRANGE k - +正序按 ID 区间读
XREVRANGE k + -倒序读
XREAD COUNT n STREAMS k id从某 ID 之后读取
XINFO STREAM k查看流的元信息

Valkey 9.1 优化了 XRANGE 的实现,区间读取相比 9.0 大约快 30%,回溯大段历史更划算。

实时跟读:XREAD

XREAD 配合特殊 ID $(表示「只要此刻之后的新消息」)+ BLOCK 可阻塞等待新数据,做实时订阅:

127.0.0.1:6379> XREAD BLOCK 0 STREAMS events $
(阻塞,直到有新消息 XADD 进来才返回)

消费组:可扩展 + 不丢消息

这是 Stream 最强的部分。消费组让多个消费者分摊同一个流的消息(每条只被组内一个消费者处理),并且每条消息要被显式 XACK 确认,否则一直停留在「待处理」(pending)列表里,可被重新投递。

# 1. 建组,从头开始消费(0 表示从最早,$ 表示只看新消息)
127.0.0.1:6379> XGROUP CREATE events g1 0
OK

# 2. 消费者 c1 领一条消息(> 表示「还没分给任何人的新消息」)
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 1 STREAMS events >
1) 1) "events"
   2) 1) 1) "1734000000000-0"
         2) 1) "type"
            2) "login"

# 3. 处理完,确认
127.0.0.1:6379> XACK events g1 1734000000000-0
(integer) 1
命令作用
XGROUP CREATE k g id创建消费组
XREADGROUP GROUP g c ... STREAMS k >组内消费者领取新消息
XACK k g id确认已处理
XPENDING k g查看未确认的消息
XCLAIM k g c minIdle id把卡住的消息转给别的消费者
XAUTOCLAIM k g c minIdle 0批量自动认领超时消息

故障恢复:认领卡住的消息

某消费者拿了消息却崩溃,消息会卡在 pending 里。用 XPENDING 查谁卡住了,再用 XAUTOCLAIM 把「闲置超过一定时间」的消息批量转给健康的消费者:

127.0.0.1:6379> XPENDING events g1
1) (integer) 1
2) "1734000000000-0"
3) "1734000000000-0"
4) 1) 1) "c1"
      2) "1"
127.0.0.1:6379> XAUTOCLAIM events g1 c2 30000 0

30000 表示认领闲置超过 30 秒的消息,交给消费者 c2,实现自动故障转移。

控制内存:XTRIM

只追加的流会无限增长,必须裁剪。两种策略:

# 只保留最近约 1000 条(~ 是近似裁剪,更快)
127.0.0.1:6379> XTRIM events MAXLEN ~ 1000
(integer) 0
# 删除某 ID 之前的所有消息
127.0.0.1:6379> XTRIM events MINID 1734000000000
(integer) 0

也可以在 XADD 时直接带 MAXLEN ~ 1000,写入即裁剪。

什么时候用 Stream? 需要持久化、历史回溯、多消费者分摊、消费确认时用 Stream。只要「即发即弃」的实时广播,用更轻的 Pub/Sub。


下一篇

Stream 是持久化的消息日志。下面看它的「即时广播」表亲——Pub/Sub。

继续阅读 → Pub/Sub:实时广播与分片订阅

On this page