Skip to Content
集合

实现 Redis 集合结构

本文进度对应的代码仓库:实现集合结构

在前两章节中,我们实现了 ListHash 数据结构,接下来我们将实现 Set 数据结构。Set 是一个无序的集合,集合中的元素是唯一的。

一个 Set 结构如下:

[ "a", "b", "c" ]

在 Redis 中,Set 结构的实现基于两种数据结构,和 Hash 类似,Set 对于纯整数集合且元素数量较少时,使用 intset 存储,当集合包含非整数元素或元素数量超过阈值时,使用 hashtable 存储。我们在实现时也会使用这两种数据结构。

具体实现

db 操作函数

先到 database/db.go 中添加两个方法:

database/db.go
// getAsSet returns a set.Set from database func getAsSet(db *DB, key string) (set.Set, reply.ErrorReply) { entity, exists := db.GetEntity(key) if !exists { return nil, nil } setObj, ok := entity.Data.(set.Set) if !ok { return nil, reply.MakeWrongTypeErrReply() } return setObj, nil } // getOrInitSet returns a set.Set for the given key // creates a new one if it doesn't exist func getOrInitSet(db *DB, key string) (set.Set, bool, reply.ErrorReply) { setObj, errReply := getAsSet(db, key) if errReply != nil { return nil, false, errReply } isNew := false if setObj == nil { setObj = set.NewHashSet() isNew = true } return setObj, isNew, nil }

这两个方法是用于获取 Set 对象的,getAsSet 方法用于从数据库中获取 Set 对象,如果不存在则返回 nil;getOrInitSet 方法用于获取或初始化 Set 对象,如果不存在则创建一个新的 Set 对象。

这里我们需要有一个 Set 接口,定义 Set 的基本操作。以及 NewHashSet 方法用于创建一个新的 Set 对象。

为什么这里要采用接口呢?因为 Go 语言倡导面向接口编程,我们先写出接口然后对其进行实现,如果后续我们需要使用更高性能的实现,不用修改上层对接口的调用,只需要实现接口即可。

我们在 datastruct 下创建目录 set,并在其中创建 set.go 文件,定义 Set 接口:

datastruct/set/set.go
package set // Set represents a Redis set type Set interface { }

然后在同级目录创建 hash_set.go 文件,定义 HashSet 结构体:

datastruct/set/hash_set.go
type HashSet struct { dict map[string]struct{} intset *IntSet isIntset bool }

HashSet 结构体包含一个 dict 字典,用于存储集合中的元素;一个 intset 整数集合,用于存储整数集合;一个 isIntset 布尔值,用于标识当前集合是否为整数集合(用于切换底层实现)。

然后创建一个 NewHashSet 方法用于创建一个新的 HashSet 对象:

datastruct/set/hash_set.go
// NewHashSet creates a new HashSet func NewHashSet() *HashSet { return &HashSet{ dict: make(map[string]struct{}), intset: NewIntSet(), isIntset: true, // Default to intset } }

NewHashSet 方法返回一个新的 HashSet 对象,默认使用整数集合。

在同级目录下创建 int_set.go 文件,定义 IntSet 结构体:

datastruct/set/int_set.go
// Dymamic encoding for intset // According to the size of the intset, we can use different encoding // types to save space. const ( INTSET_ENC_INT16 = 2 INTSET_ENC_INT32 = 4 INTSET_ENC_INT64 = 8 ) type IntSet struct { encoding uint32 length uint32 contents []byte } // NewIntSet creates a new IntSet with the given encoding func NewIntSet() *IntSet { return &IntSet{ encoding: INTSET_ENC_INT16, // Default encoding is 16-bit integer length: 0, contents: make([]byte, 0), } }

IntSet 结构体包含一个 encoding 字段,用于存储当前集合的编码类型;一个 length 字段,用于存储当前集合的长度;一个 contents 字段,用于存储当前集合的内容。

IntSet 使用动态编码的方式来存储整数集合,根据集合中整数的范围选择不同的编码类型,以节省内存空间。具体来说,IntSet 支持以下三种编码形式:

  • INTSET_ENC_INT16 (2 字节): 使用 16 位有符号整数存储数据,适用于所有整数范围在 [-32,768, 32,767] 之间的集合。
  • INTSET_ENC_INT32 (4 字节): 使用 32 位有符号整数存储数据,适用于所有整数范围在 [-2,147,483,648, 2,147,483,647] 之间的集合。
  • INTSET_ENC_INT64 (8 字节): 使用 64 位有符号整数存储数据,适用于所有整数范围在 [-9,223,372,036,854,775,808, 9,223,372,036,854,775,807] 之间的集合。

为什么使用动态编码?

动态编码的主要目的是 节省内存。在 Redis 中,内存的高效利用至关重要,尤其是在存储大量小型数据集合时。通过根据集合中整数的范围选择最小的编码类型,可以显著减少内存占用。例如:

  • 如果集合中的整数都在 [-32,768, 32,767] 范围内,使用 INTSET_ENC_INT16 即可满足需求,无需使用更大的编码类型。这样在内存中它们的排列更加紧凑。
  • 当集合中出现超出当前编码范围的整数时,IntSet 会自动升级到更高的编码类型(例如从 INTSET_ENC_INT16 升级到 INTSET_ENC_INT32),以确保能够存储更大的整数。

这里我们需要先讲明白在 IntSet 中的 contents 字段是如何存储数据的。

什么是小端字节序(Little Endian)

字节序(Byte Order) 指的是多字节数据(比如 16位、32位、64位整数)在内存中按字节存储的顺序

小端字节序(Little Endian)的规则是:

  • 低位字节(Least Significant Byte, LSB)放在内存的低地址
  • 高位字节(Most Significant Byte, MSB)放在内存的高地址

比如一个32位整数 0x12345678(十六进制表示),在小端存储下,内存中的字节顺序是:

地址(低 → 高)数据
addr0x78
addr+10x56
addr+20x34
addr+30x12

直觉理解:数字”拆碎”后,把最小的碎片(低位)放前面。

那么 contents 中元素排布是咋样的?

假设我们建了一个 IntSet,初始是 INTSET_ENC_INT16 编码(每个元素2字节), 我们插入下面的数:

  • 插入 100
  • 插入 300
  • 插入 500

每个数 2字节,小端编码存储。它们的二进制分别是:

数值16位二进制小端存储的两个字节
1000000 0110 01000x64 0x00
3000001 0010 11000x2C 0x01
5000001 1111 01000xF4 0x01

于是 contents 这个 []byte 数组的内容依次是:

下标字节内容说明
00x64100的低字节
10x00100的高字节
20x2C300的低字节
30x01300的高字节
40xF4500的低字节
50x01500的高字节

整个 contents 看起来就是:

[0x64 0x00 0x2C 0x01 0xF4 0x01]

注意:

  • 每两个字节是一组元素(因为 encoding 是 2)。
  • 元素按照升序排列(100 < 300 < 500)。

如果我们升级到 INTSET_ENC_INT32 编码(每个元素4字节),那么 contents 中的内容就会变成:

下标字节内容说明
00x64100的字节 1
10x00100的字节 2
20x00100的字节 3
30x00100的字节 4
40x2C300的字节 1
50x01300的字节 2
60x00300的字节 3
70x00300的字节 4
80xF4500的字节 1
90x01500的字节 2
100x00500的字节 3
110x00500的字节 4

整个 contents 看起来就是:

[0x64 0x00 0x00 0x00 0x2C 0x01 0x00 0x00 0xF4 0x01 0x00 0x00]

然后我们回到 database/set.go 中。

实现 SADD

我们先实现 SADD 命令,SADD 命令用于向集合中添加元素。

命令语法

SADD 命令的语法如下:

SADD key member [member ...]
  • key 是集合的键。也就是集合的名称。
  • member 是要添加的元素,可以是一个或多个元素。

例如:

SADD myset a b c // 返回值:3

上面的命令会向集合 myset 中添加元素 abc

如果集合中已经存在某个元素,则不会重复添加。例如:

SADD myset a b c // 返回值 3 SADD myset a b c // 返回值 0

SADD 命令的返回值为添加成功的元素数量。

编写处理函数

编写处理函数:

database/set.go
// execSAdd implements SADD key member [member...] // Add one or more members to a set func execSAdd(db *DB, args [][]byte) resp.Reply { key := string(args[0]) members := args[1:] // Get or create set setObj, isNew, errReply := getOrInitSet(db, key) if errReply != nil { return errReply } // Add all members count := 0 for _, member := range members { count += setObj.Add(string(member)) } // Store back to database if it's a new set or any members were added if isNew || count > 0 { db.PutEntity(key, &database.DataEntity{ Data: setObj, }) // Add to AOF db.addAof(utils.ToCmdLineWithName("SADD", args...)) } return reply.MakeIntReply(int64(count)) }

execSAdd 函数用于处理 SADD 命令,首先获取集合对象,如果不存在则创建一个新的集合对象。然后遍历所有成员,调用 Add 方法添加成员到集合中。最后将集合对象存储到数据库中,并返回添加成功的元素数量。

同时需要处理 AOF 日志,使用 db.addAof 方法将命令添加到 AOF 日志中。

这里表示我们的 Set 接口需要一个 Add 方法,用于添加元素到集合中。

实现 Add 方法

我们在 set.go 中定义 Add 方法:

datastruct/set/set.go
// Set represents a Redis set type Set interface { Add(member string) int // Add a member to the set, return 1 if added, 0 if already exists }

然后我们来到 hash_set.go 中实现 Add 方法:

实现的时候,我们要判断当前使用的数据结构,如果是整数集合,则调用 intsetAdd 方法添加元素;如果是哈希表,则直接添加到 dict 中。

如果当前集合是整数集合,但要添加的元素不是整数,则需要将集合转换为哈希表。

datastruct/set/hash_set.go
// 定义一个常量,表示 intset 的最大元素数量 // 当元素数量超过这个值时,转换为哈希表 const ( SET_MAX_INTSET_ENTRIES = 512 // Maximum number of entries in intset ) // Add adds an integer to the set func (set *HashSet) Add(member string) int { if set.isIntset { if val, err := strconv.ParseInt(member, 10, 64); err == nil { if ok := set.intset.Add(val); ok { if set.intset.Len() > SET_MAX_INTSET_ENTRIES { set.convertToHashTable() } return 1 } return 0 } else { // The input is not a valid integer, so we need to convert to hash table // to store non-integer values set.convertToHashTable() } } if _, exists := set.dict[member]; exists { return 0 // Already exists } set.dict[member] = struct{}{} return 1 // Added successfully }

intset 的 Add 实现

然后我们跑到 int_set.go 中实现 Add 方法和 Len 方法:

datastruct/set/int_set.go
// Len returns the number of elements in the set func (is *IntSet) Len() int { return int(is.length) } // Add adds an integer to the set func (is *IntSet) Add(value int64) bool { // Check if we need to upgrade encoding var requiredEncoding uint32 if value < math.MinInt16 || value > math.MaxInt16 { if value < math.MinInt32 || value > math.MaxInt32 { requiredEncoding = INTSET_ENC_INT64 } else { requiredEncoding = INTSET_ENC_INT32 } } else { requiredEncoding = INTSET_ENC_INT16 } // Upgrade encoding if necessary if requiredEncoding > is.encoding { is.upgradeEncoding(requiredEncoding) } // Check if value already exists pos := is.findPosition(value) if pos >= 0 { return false // Value already exists } // Insert value at the correct position pos = -pos - 1 // Convert to insertion position is.insertAt(pos, value) return true }

实现 Add 方法主要需要考虑下面几个步骤:

  • 检查当前集合的编码类型,如果要添加的元素超出当前编码范围,则需要升级编码类型。
  • 检查要添加的元素是否已经存在,如果存在则返回 false。
  • 如果不存在,则在正确的位置插入元素,并返回 true。

辅助函数:升级编码类型

这里我们创建一个 upgradeEncoding 方法用于升级编码类型:

datastruct/set/int_set.go
// upgradeEncoding upgrades the encoding of the IntSet if necessary func (is *IntSet) upgradeEncoding(newEncoding uint32) { if newEncoding <= is.encoding { return } // Save old values oldValues := is.ToSlice() // Reset and use new encoding is.encoding = newEncoding is.length = 0 is.contents = make([]byte, 0, len(oldValues)*int(newEncoding)) // Re-add all values with new encoding for _, v := range oldValues { is.Add(v) } } // ToSlice returns all elements as a slice // 将当前集合中的所有元素保存到一个切片中 func (is *IntSet) ToSlice() []int64 { result := make([]int64, is.length) for i := uint32(0); i < is.length; i++ { result[i] = is.getValueAt(i) } return result }

这个方法的实现思路还是很简单的,先将当前集合中的所有元素保存到一个切片中,然后重置集合的编码类型和长度,最后将所有元素重新添加到集合中。

辅助函数:查找元素位置

然后创建 findPosition 方法用于查找元素的位置:

这个方法我们使用二分查找来查找元素的位置,返回值有两种情况:

  • 如果找到元素,则返回元素的索引。
  • 如果没有找到元素,则返回一个负数,表示元素应该插入的位置。
datastruct/set/int_set.go
// findPosition finds the position of the value in the set // Returns the index of the value if found, or the index where it should be inserted func (is *IntSet) findPosition(value int64) int { // Binary search to find position low, high := 0, int(is.length)-1 for low <= high { mid := (low + high) / 2 midVal := is.getValueAt(uint32(mid)) if midVal < value { low = mid + 1 } else if midVal > value { high = mid - 1 } else { return mid // Found } } return -(low + 1) // Not found, return insertion point }

辅助函数:获取元素值

然后创建 getValueAt 方法用于获取指定索引的值:

datastruct/set/int_set.go
// getValueAt retrieves the value at the given index func (is *IntSet) getValueAt(index uint32) int64 { if index >= is.length { panic(fmt.Sprintf("Index out of bounds: %d", index)) } offset := index * is.encoding switch is.encoding { case INTSET_ENC_INT16: return int64(int16(binary.LittleEndian.Uint16(is.contents[offset:]))) case INTSET_ENC_INT32: return int64(int32(binary.LittleEndian.Uint32(is.contents[offset:]))) case INTSET_ENC_INT64: return int64(binary.LittleEndian.Uint64(is.contents[offset:])) } panic("Invalid encoding") }

getValueAt(index uint32) 的目的是:根据索引 index,从 contents 里解码出第 index 个元素的实际数值(int64)

流程分解:

  1. 计算偏移量

    offset := index * is.encoding

    每个元素的大小是 encoding 个字节,所以位置在 index * encoding

  2. 根据 encoding 读取数据

    • encoding == 2,读2字节(Uint16),然后强制转成 int64
    • encoding == 4,读4字节(Uint32),再转成 int64
    • encoding == 8,直接读8字节(Uint64)。

    用的是 binary.LittleEndian.UintXX() 这种API,从字节流中按小端格式取数。

  3. 返回转换好的 int64

举例:

假设 encoding = 2,我们调用 getValueAt(1),想拿第2个数(300):

  • offset = 1 * 2 = 2
  • contents[2] = 0x2Ccontents[3] = 0x01
  • 小端组合 = 0x012C => 十进制是 300
  • 转成 int64(300),返回。

辅助函数:插入元素

然后实现 insertAt 方法用于在指定位置插入元素:

与前面的函数相反,insertAt 是往 IntSet 的 contents 中第 pos 个位置插入一个新的元素 value。

由于 IntSet 中的元素要求是升序排列的,所以插入时不仅要放进去,还要把原有的数据往后挪动,空出正确的位置。


insertAt 的完整流程讲解

  1. 计算扩容

插入新元素,需要比原来多 encoding 字节空间(每个元素的字节数是固定的)。

oldLen := len(is.contents) newLen := oldLen + int(is.encoding)

oldLen 是插入前的字节长度。
newLen 是插入后应该有的字节长度。

  1. 确保容量足够

如果当前 contents 的容量(cap(is.contents))小于 newLen,就需要新开一个更大的数组,复制原数据进去。

if cap(is.contents) < newLen { newContents := make([]byte, newLen, newLen*2) copy(newContents, is.contents) is.contents = newContents } else { is.contents = is.contents[:newLen] }
  • 如果容量不够就新建更大的切片(一般是 newLen 的两倍大小)。
  • 如果容量够就直接拉长切片(调整切片长度)。
  1. 挪动后面的元素

在插入位置 pos 后的所有元素,需要整体往后移动一块(大小为 encoding 字节)。

offset := pos * int(is.encoding) if pos < int(is.length) { copy(is.contents[offset+int(is.encoding):], is.contents[offset:oldLen]) }

举个例子:原本是 [元素0][元素1][元素2]
插入到位置1后,应该变成 [元素0][新元素][元素1][元素2]

所以 [元素1][元素2] 这段内容往后移动一格。

  1. 写入新元素

根据当前 encoding(2字节/4字节/8字节),把 value 写到 offset 对应的位置。

switch is.encoding { case INTSET_ENC_INT16: binary.LittleEndian.PutUint16(is.contents[offset:], uint16(value)) case INTSET_ENC_INT32: binary.LittleEndian.PutUint32(is.contents[offset:], uint32(value)) case INTSET_ENC_INT64: binary.LittleEndian.PutUint64(is.contents[offset:], uint64(value)) }

这里使用 binary.LittleEndian 保证按照小端字节序写入。

  1. 更新元素数量

插入成功后,元素数量 length 要加一。

is.length++

简单例子理解 insertAt

假设我们有:

  • encoding = 2(即每个元素2字节)
  • contents = [0x01 0x00 0x03 0x00],存的是 1 和 3
  • length = 2

现在插入数字2,插入到位置 pos = 1(即在 1 和 3 中间)。

执行步骤:

  1. 原来的 contents 长度是 4,新长度变成 6。
  2. 容量够的话直接扩展 contents。
  3. 将 0x03 0x00 这2字节的数据挪动到后面。
  4. 在 offset=2(pos * 2)的位置插入2(小端编码是 0x02 0x00)。
  5. 新的 contents 变成:
[0x01 0x00 0x02 0x00 0x03 0x00]
  1. 更新长度 length = 3。

最终效果是集合元素按升序:[1, 2, 3]。

代码如下:

datastruct/set/int_set.go
// insertAt inserts a value at the specified position func (is *IntSet) insertAt(pos int, value int64) { // Expand contents oldLen := len(is.contents) newLen := oldLen + int(is.encoding) if cap(is.contents) < newLen { newContents := make([]byte, newLen, newLen*2) copy(newContents, is.contents) is.contents = newContents } else { is.contents = is.contents[:newLen] } // Shift elements to make space offset := pos * int(is.encoding) if pos < int(is.length) { // Move all the elements after pos backward copy(is.contents[offset+int(is.encoding):], is.contents[offset:oldLen]) } // Insert new element switch is.encoding { case INTSET_ENC_INT16: binary.LittleEndian.PutUint16(is.contents[offset:], uint16(value)) case INTSET_ENC_INT32: binary.LittleEndian.PutUint32(is.contents[offset:], uint32(value)) case INTSET_ENC_INT64: binary.LittleEndian.PutUint64(is.contents[offset:], uint64(value)) } is.length++ }

现在我们的 intset 的 Add 方法就实现完成了。我们回到 hash_set.go 中,继续实现 convertToHashTable 方法。

实现 convertToHashTable 方法

convertToHashTable 方法用于将当前的整数集合转换为哈希表。转换时,我们需要将整数集合中的所有元素添加到哈希表中,然后将 isIntset 标志设置为 false,表示当前集合已经转换为哈希表。

datastruct/set/hash_set.go
// convertToHashTable converts the intset to a hash table func (set *HashSet) convertToHashTable() { if !set.isIntset { return // Already a hash table } // Copy elements from intset to hash table set.intset.ForEach(func(value int64) bool { set.dict[strconv.FormatInt(value, 10)] = struct{}{} return true }) set.isIntset = false }

到目前为止,我们的 SADD 命令已经实现完成了。

实现 SCARD

SCARD 命令用于获取集合的元素数量。

SCARD 命令的语法如下:

SCARD key
  • key 是集合的键。
  • 返回值是集合的元素数量。

例如:

SCARD myset // 返回值:3

编写处理函数:

database/set.go
// execSCard implements SCARD key // Get the number of members in a set func execSCard(db *DB, args [][]byte) resp.Reply { key := string(args[0]) // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil { return reply.MakeIntReply(0) } return reply.MakeIntReply(int64(setObj.Len())) }

这个函数比较简单,我们只需要获取集合对象,然后调用 Len 方法获取集合的元素数量即可。

实现 SISMEMBER

SISMEMBER 命令用于判断一个元素是否在集合中。

命令语法

SISMEMBER 命令的语法如下:

SISMEMBER key member
  • key 是集合的键。
  • member 是要判断的元素。

例如:

SISMEMBER myset a // 返回值:1

上面的命令会判断元素 a 是否在集合 myset 中,如果在则返回 1,否则返回 0。

编写处理函数

编写处理函数:

database/set.go
// execSIsMember implements SISMEMBER key member // Determine if a given value is a member of a set func execSIsMember(db *DB, args [][]byte) resp.Reply { key := string(args[0]) member := string(args[1]) // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil { return reply.MakeIntReply(0) } if setObj.Contains(member) { return reply.MakeIntReply(1) } return reply.MakeIntReply(0) }

execSIsMember 函数用于处理 SISMEMBER 命令,首先获取集合对象,然后调用 Contains 方法判断元素是否在集合中。

实现 Contains 方法

我们在 set.go 中定义 Contains 方法:

datastruct/set/set.go
// Set represents a Redis set type Set interface { Add(member string) int // Add a member to the set, return 1 if added, 0 if already exists Contains(member string) bool // Check if a member exists in the set }

然后来到 hash_set.go 中实现 Contains 方法:

datastruct/set/hash_set.go
// Contains checks if the set contains the given value func (set *HashSet) Contains(member string) bool { if set.isIntset { if val, err := strconv.ParseInt(member, 10, 64); err == nil { return set.intset.Contains(val) } return false // Not an integer } _, exists := set.dict[member] return exists }

这里实现思路是:

尝试将 member 转换为整数,如果转换成功,则调用 intsetContains 方法判断元素是否在集合中;如果转换失败,则说明当前集合是哈希表,直接判断 dict 中是否存在该元素即可。

实现 intsetContains 方法

这个方法的实现思路比较简单,我们只需要调用 findPosition 方法判断元素是否在集合中即可:

datastruct/set/int_set.go
// Contains checks if the set contains the given value func (is *IntSet) Contains(value int64) bool { pos := is.findPosition(value) return pos >= 0 // Value found }

到目前为止,我们的 SISMEMBER 命令已经实现完成了。

实现 SMEMBERS

SMEMBERS 命令用于获取集合中的所有元素。

命令语法

SMEMBERS 命令的语法如下:

SMEMBERS key
  • key 是集合的键。
  • 返回值是集合中的所有元素。

例如:

SMEMBERS myset // 返回值:["a", "b", "c"]

上面的命令会获取集合 myset 中的所有元素。

编写处理函数

database/set.go
// execSMembers implements SMEMBERS key // Get all the members in a set func execSMembers(db *DB, args [][]byte) resp.Reply { key := string(args[0]) // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil { return reply.MakeMultiBulkReply([][]byte{}) } // Convert members to [][]byte members := setObj.Members() result := make([][]byte, len(members)) for i, member := range members { result[i] = []byte(member) } return reply.MakeMultiBulkReply(result) }

execSMembers 函数用于处理 SMEMBERS 命令,首先获取集合对象,然后调用 Members 方法获取集合中的所有元素。最后将所有元素转换为 [][]byte 类型的切片,并返回。

实现 Members 方法

我们在 set.go 中定义 Members 方法:

datastruct/set/set.go
type Set interface { Add(member string) int // Add a member to the set, return 1 if added, 0 if already exists Contains(member string) bool // Check if a member exists in the set Members() []string // Get all members in the set }

然后来到 hash_set.go 中实现 Members 方法:

datastruct/set/hash_set.go
// Members returns all members of the set func (set *HashSet) Members() []string { if set.isIntset { members := make([]string, 0, set.intset.Len()) set.intset.ForEach(func(value int64) bool { members = append(members, strconv.FormatInt(value, 10)) return true }) return members } members := make([]string, 0, len(set.dict)) for member := range set.dict { members = append(members, member) } return members }

Members 方法用于获取集合中的所有元素,如果当前集合是整数集合,则调用 intsetForEach 方法遍历所有元素将其添加到 members 切片中; 如果当前集合是哈希表,则直接遍历 dict 中的所有元素,将其添加到 members 切片中。

最后返回 members 切片。

实现 intsetForEach 方法

ForEach 方法用于遍历整数集合中的所有元素,并对每个元素执行指定的操作。

datastruct/set/int_set.go
// ForEach iterates over all elements in the set func (is *IntSet) ForEach(consumer func(value int64) bool) { for i := uint32(0); i < is.length; i++ { if !consumer(is.getValueAt(i)) { break } } }

ForEach 方法接受一个函数 consumer 作为参数,遍历整数集合中的所有元素,并对每个元素执行 consumer 函数。如果 consumer 函数返回 false,则停止遍历。

到这里,我们的 SMEMBERS 命令已经实现完成了。

实现 SREM

SREM 命令用于从集合中删除一个或多个元素。

命令语法

SREM 命令的语法如下:

SREM key member [member ...]
  • key 是集合的键。
  • member 是要删除的元素,可以是一个或多个元素。
  • 返回值是删除成功的元素数量。

例如:

SREM myset a b c // 返回值:3

上面的命令会从集合 myset 中删除元素 abc

如果集合中不存在某个元素,则不会删除。例如:

SREM myset a b c // 返回值 3 SREM myset a b c // 返回值 0

SREM 命令的返回值为删除成功的元素数量。

编写处理函数

编写处理函数:

database/set.go
// execSRem implements SREM key member [member...] // Remove one or more members from a set func execSRem(db *DB, args [][]byte) resp.Reply { key := string(args[0]) members := args[1:] // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil { return reply.MakeIntReply(0) } // Remove all members count := 0 for _, member := range members { count += setObj.Remove(string(member)) } // If any members were removed if count > 0 { // Check if set is now empty if setObj.Len() == 0 { db.Remove(key) } else { // Store updated set db.PutEntity(key, &database.DataEntity{ Data: setObj, }) } // Add to AOF db.addAof(utils.ToCmdLineWithName("SREM", args...)) } return reply.MakeIntReply(int64(count)) }

execSRem 函数用于处理 SREM 命令,首先获取集合对象,然后遍历所有成员,调用 Remove 方法删除成员。最后将集合对象存储到数据库中,并返回删除成功的元素数量。 同时需要处理 AOF 日志,使用 db.addAof 方法将命令添加到 AOF 日志中。

这里表示我们的 Set 接口需要一个 Remove 方法,用于删除元素。

实现 Remove 方法

我们在 set.go 中定义 Remove 方法:

datastruct/set/set.go
// Set represents a Redis set type Set interface { Add(member string) int // Add a member to the set, return 1 if added, 0 if already exists Contains(member string) bool // Check if a member exists in the set Members() []string // Get all members in the set Remove(member string) int // Remove a member from the set, return 1 if removed, 0 if not found }

然后来到 hash_set.go 中实现 Remove 方法:

datastruct/set/hash_set.go
// Remove removes an integer from the set func (set *HashSet) Remove(member string) int { if set.isIntset { // If the input is an integer, we can remove it from the intset if val, err := strconv.ParseInt(member, 10, 64); err == nil { if ok := set.intset.Remove(val); ok { return 1 } return 0 } return 0 // Not an integer, cannot remove from intset } if _, exists := set.dict[member]; !exists { return 0 // Not found } delete(set.dict, member) return 1 // Removed successfully }

Remove 方法用于删除元素,如果当前集合是整数集合,则调用 intsetRemove 方法删除元素;如果当前集合是哈希表,则直接从 dict 中删除元素。

实现 intsetRemove 方法

Remove 方法用于删除整数集合中的元素,首先调用 findPosition 方法查找元素的位置,如果找到则将其删除并返回 1,否则返回 0。

datastruct/set/int_set.go
// Remove removes a value from the set func (is *IntSet) Remove(value int64) bool { pos := is.findPosition(value) if pos < 0 { return false // Value not found } is.removeAt(pos) return true }

实现 removeAt 方法

removeAt 方法用于在指定位置删除元素,删除时需要将后面的元素往前移动一位,并更新集合的长度。

datastruct/set/int_set.go
// removeAt removes the value at the specified position func (is *IntSet) removeAt(pos int) { if pos < 0 || pos >= int(is.length) { return } offset := pos * int(is.encoding) endOffset := int(is.length) * int(is.encoding) // Move elements after pos forward copy(is.contents[offset:], is.contents[offset+int(is.encoding):endOffset]) // Shrink contents is.contents = is.contents[:endOffset-int(is.encoding)] is.length-- }

removeAt 方法的实现思路比较简单,首先计算出要删除的元素的位置,然后将后面的元素往前移动一位,最后更新集合的长度。

到目前为止,我们的 SREM 命令已经实现完成了。

实现 SPOP

SPOP 命令用于随机删除集合中的一个元素,并返回该元素。

命令语法

SPOP 命令的语法如下:

SPOP key
  • key 是集合的键。
  • 返回值是随机删除的元素。

例如:

SPOP myset // 返回值:a

上面的命令会随机删除集合 myset 中的一个元素,并返回该元素。

如果集合中没有元素,则返回 nil。

编写处理函数

编写处理函数:

database/set.go
// execSPop implements SPOP key [count] // Remove and return one or multiple random members from a set func execSPop(db *DB, args [][]byte) resp.Reply { key := string(args[0]) // Determine count count := 1 if len(args) >= 2 { var err error count, err = strToInt(string(args[1])) if err != nil || count < 0 { return reply.MakeStandardErrorReply("ERR value is out of range, must be positive") } } // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil || setObj.Len() == 0 { return reply.MakeNullBulkReply() } // If count is 0, return empty array if count == 0 { return reply.MakeMultiBulkReply([][]byte{}) } // Cap count to set size if count > setObj.Len() { count = setObj.Len() } // Get random members members := setObj.RandomDistinctMembers(count) // Remove members for _, member := range members { setObj.Remove(member) } // Store updated set or remove if empty if setObj.Len() == 0 { db.Remove(key) } else { db.PutEntity(key, &database.DataEntity{ Data: setObj, }) } // Add to AOF cmdArgs := make([][]byte, 2) cmdArgs[0] = []byte(key) cmdArgs[1] = []byte(intToStr(count)) db.addAof(utils.ToCmdLineWithName("SPOP", cmdArgs...)) // If only popping one member, return it as a bulk string if count == 1 { return reply.MakeBulkReply([]byte(members[0])) } // Otherwise return array of members result := make([][]byte, len(members)) for i, member := range members { result[i] = []byte(member) } return reply.MakeMultiBulkReply(result) }

execSPop 函数用于处理 SPOP 命令,首先获取集合对象,然后判断要删除的元素数量 count。如果 count 大于集合的大小,则将其限制为集合的大小。接着调用 RandomDistinctMembers 方法获取随机的元素,并遍历所有元素,调用 Remove 方法删除元素。最后将集合对象存储到数据库中,并返回删除的元素。 同时需要处理 AOF 日志,使用 db.addAof 方法将命令添加到 AOF 日志中。

这里表示我们的 Set 接口需要一个 RandomDistinctMembers 方法,用于获取随机的元素。

实现 RandomDistinctMembers 方法

我们在 set.go 中定义 RandomDistinctMembers 方法:

datastruct/set/set.go
type Set interface { // ... other methods RandomDistinctMembers(count int) []string // Get random distinct members from the set }

然后来到 hash_set.go 中实现 RandomDistinctMembers 方法:

datastruct/set/hash_set.go
// RandomDistinctMembers returns distinct random members from the set func (set *HashSet) RandomDistinctMembers(count int) []string { size := set.Len() if count <= 0 || size == 0 { return []string{} } if count >= size { return set.Members() // Return all members if count is greater than or equal to size } members := set.Members() r := rand.New(rand.NewSource(time.Now().UnixNano())) r.Shuffle(len(members), func(i, j int) { members[i], members[j] = members[j], members[i] }) return members[:count] // Return the first 'count' members after shuffling }

RandomDistinctMembers 方法用于获取随机的元素,首先判断要删除的元素数量 count 是否大于集合的大小,如果大于,则返回所有元素。

否则,使用 rand.Shuffle 方法对集合中的元素进行随机打乱,然后返回前 count 个元素。

到这里,我们的 SPOP 命令已经实现完成了。

实现 SRANDMEMBER

SRANDMEMBER 命令用于随机获取集合中的一个或多个元素,但不删除它们。

命令语法

SRANDMEMBER 命令的语法如下:

SRANDMEMBER key [count]
  • key 是集合的键。
  • count 是要获取的元素数量,可以是正数或负数。
    • 如果 count 是正数,则返回随机的 count 个元素。
    • 如果 count 是负数,则返回随机的 abs(count) 个元素,但不删除它们。
    • 如果 count 是 0,则返回空数组。
  • 如果集合中没有元素,则返回 nil。

例如:

SRANDMEMBER myset // 返回值:a SRANDMEMBER myset 2 // 返回值:["a", "b"] SRANDMEMBER myset -2 // 返回值:["a", "b"] SRANDMEMBER myset 0 // 返回值:[]

上面的命令会随机获取集合 myset 中的一个或多个元素,但不删除它们。

如果集合中没有元素,则返回 nil。

编写处理函数

编写处理函数:

database/set.go
// execSRandMember implements SRANDMEMBER key [count] // Get one or multiple random members from a set func execSRandMember(db *DB, args [][]byte) resp.Reply { key := string(args[0]) // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil || setObj.Len() == 0 { return reply.MakeNullBulkReply() } // Determine count count := 1 withReplacement := false if len(args) >= 2 { var err error count, err = strToInt(string(args[1])) if err != nil { return reply.MakeStandardErrorReply("ERR value is not an integer") } // Negative count means return with replacement (can have duplicates) if count < 0 { withReplacement = true count = -count } } // Get random members var members []string if withReplacement { members = setObj.RandomMembers(count) } else { members = setObj.RandomDistinctMembers(count) } // If only returning one member, return it as a bulk string if len(args) == 1 || (count == 1 && len(members) > 0) { return reply.MakeBulkReply([]byte(members[0])) } // Otherwise return array of members result := make([][]byte, len(members)) for i, member := range members { result[i] = []byte(member) } return reply.MakeMultiBulkReply(result) }

execSRandMember 函数用于处理 SRANDMEMBER 命令,首先获取集合对象,然后判断要获取的元素数量 count。如果 count 大于集合的大小,则将其限制为集合的大小。接着调用 RandomDistinctMembers 方法获取随机的元素,并返回。 如果 count 是负数,则调用 RandomMembers 方法获取随机的元素,并返回。

如果 count 是 0,则返回空数组。

这里表示我们的 Set 接口需要一个 RandomMembers 方法,用于获取随机的元素。

实现 RandomMembers 方法

我们在 set.go 中定义 RandomMembers 方法:

datastruct/set/set.go
type Set interface { // ... other methods RandomMembers(count int) []string // Get random members from the set (with replacement) }

然后来到 hash_set.go 中实现 RandomMembers 方法:

datastruct/set/hash_set.go
// RandomMembers returns random members from the set func (set *HashSet) RandomMembers(count int) []string { size := set.Len() if count <= 0 || size == 0 { return []string{} } res := make([]string, count) members := set.Members() r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < count; i++ { res[i] = members[r.Intn(size)] // Use r.Intn(size) to get a random index } return res }

RandomMembers 方法用于获取随机的元素,首先判断要获取的元素数量 count 是否大于集合的大小,如果大于,则返回空数组。 否则,使用 rand.Intn(size) 方法获取随机的索引,并返回随机的元素。 到这里,我们的 SRANDMEMBER 命令已经实现完成了。

到这里,基本的集合操作命令已经实现完成了,接下来我们可以实现一些集合的高级操作命令,比如 SINTERSUNIONSDIFF 等等。

但是我们先把这些命令使用 init 方法注册。

database/set.go
func init() { RegisterCommand("SADD", execSAdd, -3) RegisterCommand("SCARD", execSCard, 2) RegisterCommand("SISMEMBER", execSIsMember, 3) RegisterCommand("SMEMBERS", execSMembers, 2) RegisterCommand("SREM", execSRem, -3) RegisterCommand("SPOP", execSPop, -2) RegisterCommand("SRANDMEMBER", execSRandMember, -2) }

到这里 Set 接口也完成了,后续的操作可以在这个接口的基础上进行实现。

datastruct/set/set.go
type Set interface { Add(member string) int // Add a member to the set, return 1 if added, 0 if already exists Remove(member string) int // Remove a member from the set, return 1 if removed, 0 if not exists Contains(member string) bool // Check if the set contains a member Members() []string // Get all members of the set Len() int // Get the number of members in the set ForEach(consumer func(member string) bool) // Iterate over all members RandomMembers(count int) []string // Get random members from the set RandomDistinctMembers(count int) []string // Get distinct random members }

实现集合的高级操作

在 Redis 中,集合的高级操作主要包括:

  • 交集(SINTER
  • 并集(SUNION
  • 差集(SDIFF
  • 交集存储(SINTERSTORE
  • 并集存储(SUNIONSTORE
  • 差集存储(SDIFFSTORE

集合高级操作命令的详细说明

若要参考代码,可以查看 database/set.go 文件。

交集(SINTER

作用: 返回多个集合的交集,即同时存在于所有集合中的元素。

实现思路:

  1. 获取第一个集合作为基准集合。
  2. 遍历其余集合,将不在当前集合中的元素从结果集中移除。
  3. 如果某个集合为空,则交集结果直接为空。
  4. 最终返回结果集合中的所有元素。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SINTER set1 set2 set3 # 返回值: ["c"]

计算过程:

  1. 基准集合为 set1,初始结果为 ["a", "b", "c"]
  2. set2 交集后,结果变为 ["b", "c"]
  3. set3 交集后,结果变为 ["c"]

并集(SUNION

作用: 返回多个集合的并集,即所有集合中出现的元素。

实现思路:

  1. 创建一个空的结果集合。
  2. 遍历所有集合,将每个集合中的元素添加到结果集合中。
  3. 返回结果集合中的所有元素。

怎么去重?我们可以直接使用 Set 接口的 Add 方法来添加元素,Add 方法会自动去重(存在的不会添加)。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SUNION set1 set2 set3 # 返回值: ["a", "b", "c", "d", "e"]

计算过程:

  1. set1 的元素加入结果集合,结果为 ["a", "b", "c"]
  2. set2 的元素加入结果集合,结果为 ["a", "b", "c", "d"]
  3. set3 的元素加入结果集合,结果为 ["a", "b", "c", "d", "e"]

差集(SDIFF

作用: 返回第一个集合与后续集合的差集,即在第一个集合中存在但不在其他集合中的元素。

实现思路:

  1. 获取第一个集合作为基准集合。
  2. 遍历其余集合,将出现在这些集合中的元素从基准集合中移除。
  3. 返回剩余的基准集合中的元素。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SDIFF set1 set2 set3 # 返回值: ["a"]

计算过程:

  1. 基准集合为 set1,初始结果为 ["a", "b", "c"]
  2. 从结果中移除 set2 中的元素,结果变为 ["a"]
  3. 从结果中移除 set3 中的元素,结果仍为 ["a"]

交集存储(SINTERSTORE

作用: 计算多个集合的交集,并将结果存储到一个新的集合中。

实现思路:

  1. 调用 SINTER 命令计算交集。
  2. 将交集结果存储到目标集合中。
  3. 返回目标集合的元素数量。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SINTERSTORE result set1 set2 set3 # 返回值: 1 SMEMBERS result # 返回值: ["c"]

并集存储(SUNIONSTORE

作用: 计算多个集合的并集,并将结果存储到一个新的集合中。

实现思路:

  1. 调用 SUNION 命令计算并集。
  2. 将并集结果存储到目标集合中。
  3. 返回目标集合的元素数量。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SUNIONSTORE result set1 set2 set3 # 返回值: 5 SMEMBERS result # 返回值: ["a", "b", "c", "d", "e"]

差集存储(SDIFFSTORE

作用: 计算多个集合的差集,并将结果存储到一个新的集合中。

实现思路:

  1. 调用 SDIFF 命令计算差集。
  2. 将差集结果存储到目标集合中。
  3. 返回目标集合的元素数量。

示例:

SADD set1 a b c SADD set2 b c d SADD set3 c d e SDIFFSTORE result set1 set2 set3 # 返回值: 1 SMEMBERS result # 返回值: ["a"]

这些命令的实现充分利用了 Set 接口的基本操作(如 AddRemoveContains 等),并通过遍历和操作集合中的元素实现了复杂的集合运算逻辑。

然后把它们都注册到 init 方法中:

database/set.go
RegisterCommand("SUNION", execSUnion, -2) RegisterCommand("SUNIONSTORE", execSUnionStore, -3) RegisterCommand("SINTER", execSInter, -2) RegisterCommand("SINTERSTORE", execSInterStore, -3) RegisterCommand("SDIFF", execSDiff, -2) RegisterCommand("SDIFFSTORE", execSDiffStore, -3)

集群适配

正如之前实现的结构一样,对于 Set 接口的实现,我们可以直接在 cluster/router.go 中进行适配。

这里分为两种情况,由于我们是用的一致性哈希算法:

  • 对于只有一个 Key 的操作,直接路由到对应的节点即可。
  • 对于多个 Key 的操作,我们需要增加额外的处理。

单 Key 操作

对于只有一个 Key 的操作,直接使用 defaultFunc 方法路由到对应的节点即可:

cluster/router.go
func makeRouter() map[string]CmdFunc { // ... // Set operations routerMap["sadd"] = defaultFunc // sadd key member [member ...] routerMap["scard"] = defaultFunc // scard key routerMap["sismember"] = defaultFunc // sismember key member routerMap["smembers"] = defaultFunc // smembers key routerMap["srem"] = defaultFunc // srem key member [member ...] routerMap["spop"] = defaultFunc // spop key [count] routerMap["srandmember"] = defaultFunc // srandmember key [count] return routerMap }

多 Key 操作

但是对于多个 Key 的操作,我们需要增加额外的处理。

因为使用一致性哈希算法,一条指令如 SINTER,接受的多个 Key 可能分布在不同的节点上。

这几个我们分别创建一个路由函数:

cluster/router.go
func makeRouter() map[string]CmdFunc { // ... // Set operations - multi-key commands (need special handling) routerMap["sunion"] = setUnionFunc // sunion key [key ...] routerMap["sunionstore"] = setUnionStoreFunc // sunionstore destination key [key ...] routerMap["sinter"] = setIntersectFunc // sinter key [key ...] routerMap["sinterstore"] = setIntersectStoreFunc // sinterstore destination key [key ...] routerMap["sdiff"] = setDiffFunc // sdiff key [key ...] routerMap["sdiffstore"] = setDiffStoreFunc // sdiffstore destination key [key ...] return routerMap }

实现 setUnionFunc

对于 SUNION 命令,我们需要遍历所有的 Key,获取每个 Key 对应的节点,然后执行 SMEMBERS 命令获取每个 Key 的所有成员,最后将所有成员合并到一起。

cluster/router.go
// setUnionFunc handles SUNION command in cluster mode func setUnionFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 2 { return reply.MakeArgNumErrReply("sunion") } // Create a set to hold the union result result := set.NewHashSet() // Process each key individually for i := 1; i < len(args); i++ { key := string(args[i]) peer := cluster.peerPicker.PickNode(key) // Create SMEMBERS command for this key smembersArgs := make([][]byte, 2) smembersArgs[0] = []byte("SMEMBERS") smembersArgs[1] = args[i] // Execute SMEMBERS on the appropriate node nodeReply := cluster.relayExec(peer, conn, smembersArgs) // Process the reply if mbReply, ok := nodeReply.(*reply.MultiBulkReply); ok { // Add each member to our result set for _, member := range mbReply.Args { result.Add(string(member)) } } else if reply.IsErrReply(nodeReply) { return nodeReply // Forward any errors } } // Convert the result set to [][]byte format for the response members := result.Members() resultBytes := make([][]byte, len(members)) for i, member := range members { resultBytes[i] = []byte(member) } return reply.MakeMultiBulkReply(resultBytes) }

实现 setUnionStoreFunc

这个命令可以调用 setUnionFunc 来实现,唯一的区别是我们需要将结果存储到一个新的集合中。

cluster/router.go
/** * setUnionStoreFunc handles SUNIONSTORE command in cluster mode * First gets the union using setUnionFunc, then stores it in the destination key */ func setUnionStoreFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 3 { return reply.MakeArgNumErrReply("sunionstore") } // Get the destination key and its node destKey := string(args[1]) destPeer := cluster.peerPicker.PickNode(destKey) // Get the union of source sets sourceArgs := make([][]byte, len(args)-1) sourceArgs[0] = []byte("SUNION") copy(sourceArgs[1:], args[2:]) // Use the above SUNION function to get the union unionReply := setUnionFunc(cluster, conn, sourceArgs) if mbReply, ok := unionReply.(*reply.MultiBulkReply); ok { // First delete the destination key (if exists) delArgs := make([][]byte, 2) delArgs[0] = []byte("DEL") delArgs[1] = args[1] cluster.relayExec(destPeer, conn, delArgs) if len(mbReply.Args) > 0 { // Create a new set on the destination node storeArgs := make([][]byte, len(mbReply.Args)+2) storeArgs[0] = []byte("SADD") storeArgs[1] = args[1] copy(storeArgs[2:], mbReply.Args) reply := cluster.relayExec(destPeer, conn, storeArgs) return reply } // If the union is empty, return 0 return reply.MakeIntReply(0) } // Return error return unionReply }

实现 setIntersectFunc

对于 SINTER 命令,我们需要遍历所有的 Key,获取每个 Key 对应的节点,然后执行 SMEMBERS 命令获取每个 Key 的所有成员,最后将所有成员交集到一起。

实现交集的时候,我们可以使用第一个集合为基准,然后迭代后面的集合,移除不存在基准集合中的元素。

cluster/router.go
/** * setIntersectFunc handles SINTER command in cluster mode * Processes each key individually and computes the intersection */ func setIntersectFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 2 { return reply.MakeArgNumErrReply("sinter") } // If there's only one key, just return its members if len(args) == 2 { key := string(args[1]) peer := cluster.peerPicker.PickNode(key) // Create SMEMBERS command for this key smembersArgs := make([][]byte, 2) smembersArgs[0] = []byte("SMEMBERS") smembersArgs[1] = args[1] return cluster.relayExec(peer, conn, smembersArgs) } // Store the set members from each key var allSets []map[string]bool // Process each key separately for i := 1; i < len(args); i++ { key := string(args[i]) peer := cluster.peerPicker.PickNode(key) // Create SMEMBERS command for this key smembersArgs := make([][]byte, 2) smembersArgs[0] = []byte("SMEMBERS") smembersArgs[1] = args[i] // Execute SMEMBERS command on the appropriate node nodeReply := cluster.relayExec(peer, conn, smembersArgs) if mbReply, ok := nodeReply.(*reply.MultiBulkReply); ok { // Convert response to a set for intersection memberSet := make(map[string]bool) for _, member := range mbReply.Args { memberSet[string(member)] = true } // If any set is empty, the intersection is empty if len(memberSet) == 0 { return reply.MakeMultiBulkReply([][]byte{}) } allSets = append(allSets, memberSet) } else if reply.IsErrReply(nodeReply) { return nodeReply } } // If no sets were obtained, return empty result if len(allSets) == 0 { return reply.MakeMultiBulkReply([][]byte{}) } // Calculate intersection result := make(map[string]bool) // Initialize result with all elements from the first set for member := range allSets[0] { result[member] = true } // Intersect with subsequent sets for i := 1; i < len(allSets); i++ { nextSet := allSets[i] for member := range result { if !nextSet[member] { delete(result, member) } } // If intersection is empty, return early if len(result) == 0 { break } } // Convert result to response format members := make([][]byte, 0, len(result)) for member := range result { members = append(members, []byte(member)) } return reply.MakeMultiBulkReply(members) }

实现 setIntersectStoreFunc

也是一样的,我们可以调用 setIntersectFunc 来实现,唯一的区别是我们需要将结果存储到一个新的集合中。

cluster/router.go
/** * setIntersectStoreFunc handles SINTERSTORE command in cluster mode * First gets the intersection using setIntersectFunc, then stores it in the destination key */ func setIntersectStoreFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 3 { return reply.MakeArgNumErrReply("sinterstore") } // Get the destination key and its node destKey := string(args[1]) destPeer := cluster.peerPicker.PickNode(destKey) // Get the intersection of source sets sourceArgs := make([][]byte, len(args)-1) sourceArgs[0] = []byte("SINTER") copy(sourceArgs[1:], args[2:]) // Use the setIntersectFunc to get the intersection intersectReply := setIntersectFunc(cluster, conn, sourceArgs) if mbReply, ok := intersectReply.(*reply.MultiBulkReply); ok { // First delete the destination key (if exists) delArgs := make([][]byte, 2) delArgs[0] = []byte("DEL") delArgs[1] = args[1] cluster.relayExec(destPeer, conn, delArgs) if len(mbReply.Args) > 0 { // Create a new set on the destination node storeArgs := make([][]byte, len(mbReply.Args)+2) storeArgs[0] = []byte("SADD") storeArgs[1] = args[1] copy(storeArgs[2:], mbReply.Args) rep := cluster.relayExec(destPeer, conn, storeArgs) // For SINTERSTORE, we need to return the cardinality of the result if intReply, ok := rep.(*reply.IntReply); ok { return reply.MakeIntReply(intReply.Code) } return rep } // If the intersection is empty, return 0 return reply.MakeIntReply(0) } // Return error if we couldn't get the intersection return intersectReply }

实现 setDiffFunc

对于 SDIFF 命令,我们需要遍历所有的 Key,获取每个 Key 对应的节点,然后执行 SMEMBERS 命令获取每个 Key 的所有成员,最后将所有成员差集到一起。

实现差集的时候,我们可以使用第一个集合为基准,然后迭代后面的集合,移除存在基准集合中的元素。

cluster/router.go
/** * setDiffFunc handles SDIFF command in cluster mode * Gets all members from the first set, then removes any members found in other sets */ func setDiffFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 2 { return reply.MakeArgNumErrReply("sdiff") } // Get the first set (base set) firstKey := string(args[1]) firstPeer := cluster.peerPicker.PickNode(firstKey) // Create SMEMBERS command for the first key smembersArgs := make([][]byte, 2) smembersArgs[0] = []byte("SMEMBERS") smembersArgs[1] = args[1] firstSetReply := cluster.relayExec(firstPeer, conn, smembersArgs) if !reply.IsMultiBulkReply(firstSetReply) { if reply.IsErrReply(firstSetReply) { return firstSetReply } return reply.MakeMultiBulkReply([][]byte{}) } // Add the members of the first set to the result set firstSetMembers := firstSetReply.(*reply.MultiBulkReply) result := make(map[string]bool) for _, member := range firstSetMembers.Args { result[string(member)] = true } // If there is only one set, just return all its members if len(args) == 2 { return firstSetReply } // Remove members of other sets from the result set for i := 2; i < len(args); i++ { key := string(args[i]) peer := cluster.peerPicker.PickNode(key) // Create SMEMBERS command for this key smembersArgs := make([][]byte, 2) smembersArgs[0] = []byte("SMEMBERS") smembersArgs[1] = args[i] nodeReply := cluster.relayExec(peer, conn, smembersArgs) if mbReply, ok := nodeReply.(*reply.MultiBulkReply); ok { // Remove members of this set from the result set for _, member := range mbReply.Args { delete(result, string(member)) } } else if reply.IsErrReply(nodeReply) { return nodeReply } // If the difference is already empty, return early if len(result) == 0 { break } } // Convert result to response format members := make([][]byte, 0, len(result)) for member := range result { members = append(members, []byte(member)) } return reply.MakeMultiBulkReply(members) }

实现 setDiffStoreFunc

对于 SDIFFSTORE 命令,我们可以调用 setDiffFunc 来实现,唯一的区别是我们需要将结果存储到一个新的集合中。

cluster/router.go
/** * setDiffStoreFunc handles SDIFFSTORE command in cluster mode * First gets the difference using setDiffFunc, then stores it in the destination key */ func setDiffStoreFunc(cluster *ClusterDatabase, conn resp.Connection, args [][]byte) resp.Reply { if len(args) < 3 { return reply.MakeArgNumErrReply("sdiffstore") } // Get the destination key and its node destKey := string(args[1]) destPeer := cluster.peerPicker.PickNode(destKey) // Get the difference of source sets sourceArgs := make([][]byte, len(args)-1) sourceArgs[0] = []byte("SDIFF") copy(sourceArgs[1:], args[2:]) // Use the setDiffFunc to get the difference diffReply := setDiffFunc(cluster, conn, sourceArgs) if mbReply, ok := diffReply.(*reply.MultiBulkReply); ok { // First delete the destination key (if exists) delArgs := make([][]byte, 2) delArgs[0] = []byte("DEL") delArgs[1] = args[1] cluster.relayExec(destPeer, conn, delArgs) if len(mbReply.Args) > 0 { // Create a new set on the destination node storeArgs := make([][]byte, len(mbReply.Args)+2) storeArgs[0] = []byte("SADD") storeArgs[1] = args[1] copy(storeArgs[2:], mbReply.Args) rep := cluster.relayExec(destPeer, conn, storeArgs) // For SDIFFSTORE, we need to return the cardinality of the result if intReply, ok := rep.(*reply.IntReply); ok { return reply.MakeIntReply(intReply.Code) } return rep } // If the difference is empty, return 0 return reply.MakeIntReply(0) } // Return error if we couldn't get the difference return diffReply }

到这里我们就实现完了 set 结构的所有操作命令。

接下来我们就要开始测试了。

测试

在 Standalone 模式下测试

使用 redis-cli -p 6380 连接到 Redis 实例。

redis-cli -p 6380

然后执行以下测试:

(base) orangejuice@My-Mac redigo % redis-cli -p 6380 127.0.0.1:6380> SADD myset apple (integer) 1 127.0.0.1:6380> SADD myset banana orange (integer) 2 127.0.0.1:6380> SADD myset apple (integer) 0 127.0.0.1:6380> SCARD myset (integer) 3 127.0.0.1:6380> SISMEMBER myset apple (integer) 1 127.0.0.1:6380> SISMEMBER myset grape (integer) 0 127.0.0.1:6380> SMEMBERS myset 1) "apple" 2) "banana" 3) "orange" 127.0.0.1:6380> SADD myset grape watermelon kiwi (integer) 3 127.0.0.1:6380> SREM myset banana (integer) 1 127.0.0.1:6380> SREM myset nonexistent (integer) 0 127.0.0.1:6380> SMEMBERS myset 1) "apple" 2) "orange" 3) "grape" 4) "watermelon" 5) "kiwi" 127.0.0.1:6380> SRANDMEMBER myset "apple" 127.0.0.1:6380> SRANDMEMBER myset 2 1) "kiwi" 2) "apple" 127.0.0.1:6380> SRANDMEMBER myset -3 1) "watermelon" 2) "grape" 3) "orange" 127.0.0.1:6380> SRANDMEMBER myset -3 1) "watermelon" 2) "orange" 3) "orange" 127.0.0.1:6380> SPOP myset "orange" 127.0.0.1:6380> SPOP myset 2 1) "kiwi" 2) "grape" 127.0.0.1:6380> SADD set1 a b c d (integer) 4 127.0.0.1:6380> SADD set2 c d e f (integer) 4 127.0.0.1:6380> SINTER set1 set2 1) "d" 2) "c" 127.0.0.1:6380> SUNION set1 set2 1) "b" 2) "c" 3) "d" 4) "a" 5) "e" 6) "f" 127.0.0.1:6380> SDIFF set1 set2 1) "a" 2) "b" 127.0.0.1:6380> SDIFF set2 set1 1) "e" 2) "f" 127.0.0.1:6380> SINTERSTORE result set1 set2 (integer) 2 127.0.0.1:6380> SMEMBERS result 1) "d" 2) "c" 127.0.0.1:6380> SUNIONSTORE result2 set1 set2 (integer) 6 127.0.0.1:6380> SMEMBERS result2 1) "a" 2) "b" 3) "c" 4) "f" 5) "e" 6) "d" 127.0.0.1:6380> SDIFFSTORE result3 set1 set2 (integer) 2 127.0.0.1:6380> SMEMBERS result3 1) "a" 2) "b" 127.0.0.1:6380> SADD intset 1 2 3 4 5 (integer) 5 127.0.0.1:6380> SADD intset "string" (integer) 1 127.0.0.1:6380> SMEMBERS intset 1) "3" 2) "4" 3) "5" 4) "string" 5) "1" 6) "2"

然后为了测试从 intset 自动切换为 hashset,我们可以添加一个命令来查询底层实现的类型:

SETTYPE key
database/set.go
// SetType represents the type of the set (intset or hashset) func execSetType(db *DB, args [][]byte) resp.Reply { key := string(args[0]) // Get set setObj, errReply := getAsSet(db, key) if errReply != nil { return errReply } if setObj == nil { return reply.MakeNullBulkReply() } // Determine set type if setObj.IsIntSet() { return reply.MakeStatusReply("intset") } return reply.MakeStatusReply("hashset") } func init() { RegisterCommand("SETTYPE", execSetType, 2) }
datastruct/set/set.go
type Set interface { // ... other methods IsIntSet() bool // Check if the set is an IntSet }
datastruct/set/hash_set.go
// IsIntSet checks if the set is an IntSet func (set *HashSet) IsIntSet() bool { return set.isIntset }

然后执行以下测试:

127.0.0.1:6380> SADD bigintset 1 2 3 4 5 (integer) 5 127.0.0.1:6380> SETTYPE bigintset intset

现在我们的 bigintset 使用的是 intset 结构。

然后我们创建一个 shell 脚本 test_set_datastruct.sh 来执行这些测试:

#!/bin/bash redis-cli -p 6380 DEL bigintset for i in {1..600}; do redis-cli -p 6380 SADD bigintset $i > /dev/null done redis-cli -p 6380 SCARD bigintset redis-cli -p 6380 SETTYPE bigintset
(base) orangejuice@Mac shell % bash test_set_datastruct.sh (integer) 1 (integer) 600 hashset

执行完毕我们看到 bigintset 的大小为 600,类型为 hashset

这说明我们在 intsethashset 之间的切换是成功的。

在 Cluster 模式下测试

我们修改 redis.conf 文件,打开集群模式。然后如前面集群那一章一样,开启两个 Redis 实例,分别监听在 6380 和 6391 端口。

然后创建两个终端分别连接到两个 Redis 实例:

redis-cli -p 6380
redis-cli -p 6391

然后我们主要测试 SUNIONSINTERSDIFFSUNIONSTORESINTERSTORESDIFFSTORE 命令。

6380
(base) orangejuice@Mac redigo % redis-cli -p 6380 127.0.0.1:6380> SADD set1 a b c d (integer) 0 127.0.0.1:6380> SADD set1 a b c d (integer) 4 127.0.0.1:6380> SINTER set1 set2 1) "c" 2) "d" 127.0.0.1:6380> SDIFF set2 set1 1) "e" 2) "f" 127.0.0.1:6380> SMEMBERS result 1) "c" 2) "d" 127.0.0.1:6380> SMEMBERS result2 1) "a" 2) "b" 3) "c" 4) "e" 5) "f" 6) "d" 127.0.0.1:6380> SADD set4 1 2 3 4 (integer) 4 127.0.0.1:6380> SMEMBERS set5 1) "1" 2) "2" 3) "5" 4) "6" 127.0.0.1:6380>
6391
(base) orangejuice@Mac shell % redis-cli -p 6391 127.0.0.1:6391> SADD set2 c d e f (integer) 4 127.0.0.1:6391> SUNION set1 set2 1) "c" 2) "e" 3) "f" 4) "d" 5) "a" 6) "b" 127.0.0.1:6391> SDIFF set1 set2 1) "a" 2) "b" 127.0.0.1:6391> SINTERSTORE result set1 set2 (integer) 2 127.0.0.1:6391> SUNIONSTORE result2 set1 set2 (integer) 6 127.0.0.1:6391> SDIFFSTORE result3 set1 set2 (integer) 2 127.0.0.1:6391> SMEMBERS result3 1) "b" 2) "a" 127.0.0.1:6391> SADD set5 1 2 5 6 (integer) 4 127.0.0.1:6391>

如上面的结果我们可以看到,我们的 set 在集群上也能正常运行了。

单元测试

我们在 datastruct/set 目录下创建一个 set_test.go 文件,编写单元测试。

单元测试有利于我们在开发过程中及时发现问题,避免在后期出现 bug。

可以使用 cd datastruct/set 命令进入到 set 目录下,然后使用 go test 命令运行测试。

cd datastruct/set go test -v

感兴趣可以看代码 这里

总结

到这里我们就完成了 set 结构的实现。

我们实现了 set 结构的基本操作,包括 SADDSCARDSISMEMBERSMEMBERSSREMSRANDMEMBERSPOPSUNIONSINTERSDIFFSUNIONSTORESINTERSTORESDIFFSTORE 命令。

我们还实现了 intsethashset 之间的切换。

我们还实现了集群模式下的路由和命令转发。

然后我们进行了较为完善的测试。

Last updated on