365Tools
    发布时间:2024-03-02 14:30:01
Redis Stream 是 Redis 5.0 版本引入的一种新数据类型,同时它也是 Redis 中最为复杂的数据结构,本节主要对 Stream 做相关介绍。
| 命令 | 说明 | 
|---|---|
| XADD | 添加消息到末尾。 | 
| XTRIM | 对 Stream 流进行修剪,限制长度。 | 
| XDEL | 删除指定的消息。 | 
| XLEN | 获取流包含的元素数量,即消息长度。 | 
| XRANGE | 获取消息列表,会自动过滤已经删除的消息。 | 
| XREVRANGE | 反向获取消息列表,ID 从大到小。 | 
| XREAD | 以阻塞或非阻塞方式获取消息列表。 | 
| XGROUP CREATE | 创建消费者组。 | 
| XREADGROUP GROUP | 读取消费者组中的消息。 | 
| XACK | 将消息标记为"已处理"。 | 
| XGROUP SETID | 为消费者组设置新的最后递送消息ID。 | 
| XGROUP DELCONSUMER | 删除消费者。 | 
| XGROUP DESTROY | 删除消费者组。 | 
| XPENDING | 显示待处理消息的相关信息。 | 
| XCLAIM | 转移消息的归属权。 | 
| XINFO | 查看 Stream 流、消费者和消费者组的相关信息。 | 
| XINFO GROUPS | 查看消费者组的信息。 | 
| XINFO STREAM | 查看 Stream 流信息。 | 
| XINFO CONSUMERS key group | 查看组内消费者流信息。 | 
#添加一个消息, * 表示以时间戳自动创建id
127.0.0.1:6379> XADD mystream * username www.biancheng.net age 10 www.365tools.cn age 9
"1610619132674-1"
#自定义id等于001,注意id只增不减
127.0.0.1:6379> XADD mystream1 001 name zhangsan addr hebei
"1-0"
127.0.0.1:6379> XADD mystream1 002 name lisi addr hunan
"2-0"
#如果插入重复的id号会报错
127.0.0.1:6379> XADD mystream1 001 name wangwu addr fujian
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD mystream1 003 name wangwu addr fujian
"3-0"
#删除id=001的数据
127.0.0.1:6379> XDEL mystream1 001
(integer) 1
#查看stream队列包含的消息数量,也就消息长度
127.0.0.1:6379> XLEN mystream1
(integer) 2
#获取消息列表,-表示最小,+表示最大
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1610619132674-0"
   2) 1) "username"
      2) "www.biancheng.net"
      3) "age"
      4) "10"
2) 1) "1610619178028-0"
   2) 1) "username"
      2) "www.365tools.cn"
      3) "age"
      4) "9"
#获取消息列表
127.0.0.1:6379> XRANGE mystream1 - 003
1) 1) "2-0"
   2) 1) "name"
      2) "lisi"
      3) "addr"
      4) "hunan"
2) 1) "3-0"
   2) 1) "name"
      2) "wangwu"
      3) "addr"
      4) "fujian"
#使用count指定返回数据的数量
127.0.0.1:6379> XRANGE mystream1 - 003 count 1
1) 1) "2-0"
   2) 1) "name"
      2) "lisi"
      3) "addr"
      4) "hunan"
#删除整个Stream
127.0.0.1:6379> DEL mystream
#使用xread读取消息
127.0.0.1:6379> XREAD count 2 STREAMS mystream1 2-0
1) 1) "mystream1"
   2) 1) 1) "3-0"
         2) 1) "name"
            2) "wangwu"
            3) "addr"
            4) "fujian"
XADD key ID field value [field value ...]
参数说明如下:*表示由 redis 生成,可以自定义,但是要自己保证递增性;XADD mystream * username www.biancheng.net age 10
XADD mystream1 001 name zhangsan addr hebei
XGROUP CREATE指令创建消费组(Consumer Group),在创建时,需要传递起始消息的 ID 用来初始化 last_delivered_id 变量。语法格式如下:XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]参数说明如下:
#创建消费组,并传递消息起始id 0-0
127.0.0.1:6379> XGROUP CREATE mystream1 ms1 0-0
OK
#从尾部开始消费信息,只接受新消息
127.0.0.1:6379> XGROUP CREATE mystream1 ms3 $
OK
#xinfo查看队列信息
127.0.0.1:6379> XINFO stream mystream1
#队列中消息的长度
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
#有几个消费组
7) "groups"
8) (integer) 2
9) "last-generated-id"
10) "3-0"
11) "first-entry" #第一个消息
12) 1) "2-0"
    2) 1) "name"
       2) "lisi"
       3) "addr"
       4) "hunan"
13) "last-entry" #最后一个消息
14) 1) "3-0"
    2) 1) "name"
       2) "wangwu"
       3) "addr"
       4) "fujian"
#查看消费组信息
127.0.0.1:6379> XINFO GROUPS mystream1
1) 1) "name"
   2) "ms1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"
2) 1) "name"
   2) "ms3"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "3-0"
XREADGROUP命令使消费组消费信息,它和XREAD命令一样,都可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PLE(正在处理的消息)结构里,客户端处理完毕后使用 XACK 命令通知 Redis 服务器,本条消息已经处理完毕,该消息的 ID 就会从 PEL 中移除。示意图如下:
XREADGROUP命令的语法格式如下所示:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
参数说明如下:
#消费组中消费者读取消息,> 表示每当消费一个信息,消费组游标就前移一位
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "2-0"
         2) 1) "name"
            2) "lisi"
            3) "addr"
            4) "hunan"
#再使用ms1-c1读取一条消息
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "3-0"
         2) 1) "name"
            2) "wangwu"
            3) "addr"
            4) "fujian"
#BLOCK 1000表示等待1秒,如果没有任何消息到来,则返回nill,此时移动到了末尾
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 BLOCK 1000 STREAMS mystream1 >
(nil)
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 1
1) 1) "mystream1"
   2) 1) 1) "2-0"
         2) 1) "name"
            2) "lisi"
            3) "addr"
            4) "hunan"
#超出了消息id的范围
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 3
1) 1) "mystream1"
   2) (empty list or set)
#添加新的消息 ID为004
127.0.0.1:6379> XADD mystream1 004 name zhangwu age 24
#使用另外一个消费组读取消息
127.0.0.1:6379> XREADGROUP GROUP ms3 c2 COUNT 2 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "4-0"
         2) 1) "name"
            2) "zhangwu"
            3) "age"
            4) "21"
#xack将id=002消息标记为已经处理
127.0.0.1:6379> XACK mystream1 ms1 002
(integer) 1
注意:>表示每当消费者读取一条消息时,last_delivered_id 变量就会前移一位。