Skip to Content
实现内存数据库

Redis 内存数据库

本文进度对应的代码仓库:Redis 内存数据库

在前面的章节中,我们先是实现了一个简单的 TCP 服务器,然后又实现了一个简单的 RESP 解析器。 在这一章中,我们将实现一个 Redis 内存数据库。

创建接口

首先我们要创建一个 Dict 接口,表示一个字典。Redis 中的 Dict 结构提供了对键值对的基本操作(如获取、添加、删除等)。以及一些高级功能,如迭代器、随机返回键等。

使用接口来抽象化 Dict 的实现,可以让我们在后续的实现中更容易地替换底层的数据结构。而不用影响到上层的逻辑。

例如下面我们将使用 sync.Map 来实现 Dict。但是未来我们可能会使用其他的数据结构来实现 Dict,比如 hashmapskiplist 等等。使用接口可以让我们在不修改上层逻辑的情况下,轻松地替换底层的数据结构。

首先在根目录下创建一个 datastruct 目录,专门用来存放数据结构的实现。然后在 datastruct 目录下创建一个 dict 目录,专门用来存放 Dict 的实现。最后在 dict 目录下创建一个 dict.go 文件,专门用来存放 Dict 的接口定义。

datastruct/dict/dict.go
package dict type Consumer func(key string, val interface{}) bool // function type for iterating over key-value pairs type Dict interface { Get(key string) (val interface{}, exists bool) // get value by key, return the value and a boolean indicating if the key exists Len() int // get the number of key-value pairs Put(key string, val interface{}) (result int) // put key-value pair, if exists, modify the value, return 0, if doesn't exist, add it, return 1 PutIfAbsent(key string, val interface{}) (result int) // put key-value pair if absent, return 0, if exists, return 1 PutIfExists(key string, val interface{}) (result int) // put key-value pair if exists, return 0, if absent, return 1 Remove(key string) (result int) // remove key-value pair, return the count of pairs ForEach(consumer Consumer) // iterate over all key-value pairs Keys() []string // get all keys RandomKeys(n int) []string // get n random keys RandomDistinctKeys(n int) []string // get n distinct random keys Clear() // clear all key-value pairs }

在这里我们定义了 Redis 中数据结构常用的一些方法:

  • Get(key string) (val interface{}, exists bool):根据键获取值,如果键存在则返回值和 true,否则返回 nil 和 false。
  • Len() int:获取键值对的数量。
  • Put(key string, val interface{}) (result int):添加键值对,如果键已经存在,则覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。也就是说如果键已经存在,则不添加键值对但是进行值的修改。
  • PutIfAbsent(key string, val interface{}) (result int):添加键值对,如果键已经存在,则不覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。
  • PutIfExists(key string, val interface{}) (result int):添加键值对,如果键不存在,则不添加键值对。也就是说仅仅在键存在时才添加键值对。
  • Remove(key string) (result int):删除键值对,返回键值对的数量。
  • ForEach(consumer Consumer):遍历所有的键值对,consumer 是一个函数类型,用来处理每一个键值对。它允许用户在遍历时自定义处理逻辑。

例如,我们可以在遍历时打印每一个键值对:

func printKeyValue(key string, val interface{}) { fmt.Printf("key: %s, value: %v\n", key, val) } // 使用 ForEach 方法遍历所有的键值对 dict.ForEach(printKeyValue)
  • Keys() []string:获取所有的键,返回一个字符串切片。
  • RandomKeys(n int) []string:获取 n 个随机键,返回一个字符串切片。
  • RandomDistinctKeys(n int) []string:获取 n 个不重复的随机键,返回一个字符串切片。
  • Clear():清空所有的键值对。这是一个包内方法,不可被外部调用。

接下来我们就需要去分别完成 Dict 接口的具体实现。

实现 SyncDict

datastruct/dict 目录下创建一个 sync_dict.go 文件,用于实现一个线程安全的 Dict。我们使用 Go 的内置 sync.Map 来实现这个 Dict。sync.Map 是一个并发安全的 map,它提供了原子操作的方法,可以在多个 goroutine 中安全地使用。

💡

sync.Map 是 Go 语言标准库中的一个并发安全的 map 实现。sync.Map 主要用于在高并发场景下共享数据,比如缓存、频繁读写的共享状态等。

Go 原生的 map 不是并发安全的,如果多个 goroutine 同时读写同一个 map,会导致 fatal error: concurrent map read and map write 的运行时错误。

所以我们需要使用 sync.Map 来实现一个线程安全的 Dict。

为什么 Redis 中需要一个线程安全的 Dict 呢?因为 Redis 是一个单线程的数据库,它使用事件驱动模型来处理请求。虽然 Redis 是单线程的,但是它的底层数据结构是可以被多个线程同时访问的。所以我们需要一个线程安全的 Dict 来保证数据的一致性和安全性。

sync.Map 的实现原理是使用了读写锁(sync.RWMutex)来保护 map 的读写操作。它的读操作是并发安全的,而写操作是互斥的。这就保证了在高并发场景下,读操作不会阻塞其他的读操作,而写操作会阻塞其他的读写操作。

加下来我们开始对 Dict 接口进行实现。首先实现一个 MakeSyncDict 函数,用于创建一个新的 SyncDict 实例。这个函数会返回一个实现了 Dict 接口的 SyncDict 实例。

datastruct/dict/sync_dict.go
// MakeSyncDict creates a new SyncDict instance func MakeSyncDict() *SyncDict { return &SyncDict{} }

Get

然后实现 Get 方法,用于根据键获取值。如果键存在,则返回值和 true,否则返回 nil 和 false。

datastruct/dict/sync_dict.go
// Get returns the value associated with the given key and a boolean indicating if the key exists func (dict *SyncDict) Get(key string) (val interface{}, exists bool) { // Get the value by key if value, ok := dict.m.Load(key); ok { return value, true } return nil, false }

这个方法是在对 Dict 接口中的 Get 方法进行实现,而在 Go 中,鸭子类型告诉我们,只要一个类型实现了某个接口的方法,那么这个类型就可以被视为这个接口的实现。

由于我们直接使用了 sync.Map ,所以获取值方法实际上就是从 sync.Map 中获取值。我们使用 Load 方法来获取值。Load 方法会返回一个值和一个布尔值,表示键是否存在。

Len

接下来实现 Len 方法,用于获取键值对的数量。但是 sync.Map 并没有提供直接获取长度的方法。我们需要使用 Range 方法来遍历所有的键值对,然后统计数量。

datastruct/dict/sync_dict.go
// Len returns the number of key-value pairs in the dictionary func (dict *SyncDict) Len() int { count := 0 // Iterate over all key-value pairs and count them dict.m.Range(func(key, value interface{}) bool { count++ return true }) return count }

在这里我们使用 Range 方法来遍历所有的键值对。Range 方法会调用一个函数,传入每一个键值对。我们在这个函数中统计数量。

Put

接下来实现 Put 方法,用于添加键值对。如果键已经存在,则覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。也就是说如果键已经存在,则不添加键值对但是进行值的修改。

datastruct/dict/sync_dict.go
// Put adds a key-value pair to the dictionary, if the key already exists, return 1 else 0 func (dict *SyncDict) Put(key string, val interface{}) (result int) { _, exists := dict.m.Load(key) // Store the key-value pair dict.m.Store(key, val) // Return the count of pairs if exists { return 0 } return 1 }

在这里我们使用 Store 方法来添加键值对。Store 方法会覆盖原有的值。我们只需要判断键是否存在,然后返回 0 或 1 即可。

💡

为啥这里要返回 0 或 1 呢?

这种设计遵循了 Redis 的命令返回值约定。返回值告诉调用者该操作是更新了一个已存在的键(返回0)还是创建了一个新键(返回1)。这些返回值常用于统计操作影响的键数量,例如在批量操作中跟踪修改的总数。调用者可以根据返回值执行不同的后续操作。

PutIfAbsent 和 PutIfExists

接下来实现 PutIfAbsentPutIfExists 方法。PutIfAbsent 方法用于仅在键不存在时添加键值对,PutIfExists 方法用于仅在键存在时添加键值对。我们可以使用 Load 方法来判断键是否存在。然后使用 Store 方法来添加键值对。

datastruct/dict/sync_dict.go
// PutIfAbsent adds a key-value pair to the dictionary if the key does not exist, return 1 if it exists, else 0 func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) { _, exists := dict.m.Load(key) if exists { return 0 } // Store the key-value pair dict.m.Store(key, val) return 1 } // PutIfExists adds a key-value pair to the dictionary if the key exists, return 1 if it does not exist, else 0 func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) { _, exists := dict.m.Load(key) if !exists { return 0 } // Store the key-value pair dict.m.Store(key, val) return 1 }

Remove

接下来实现 Remove 方法,用于删除键值对。我们可以使用 Load 方法来判断键是否存在。然后使用 Delete 方法来删除键值对。

datastruct/dict/sync_dict.go
// Remove removes a key-value pair from the dictionary, return the count of pairs were removed func (dict *SyncDict) Remove(key string) (result int) { _, exists := dict.m.Load(key) if !exists { return 0 } // Delete the key-value pair dict.m.Delete(key) return 1 }

ForEach

接下来实现 ForEach 方法,用于遍历所有的键值对。我们可以使用 sync.MapRange 方法来遍历所有的键值对。我们在遍历时调用传入的函数来处理每一个键值对。

datastruct/dict/sync_dict.go
// ForEach iterates over all key-value pairs in the dictionary and applies the consumer function to each pair func (dict *SyncDict) ForEach(consumer Consumer) { // Iterate over all key-value pairs and apply the consumer function dict.m.Range(func(key, value interface{}) bool { consumer(key.(string), value) // Always return true to continue iteration return true }) }

Keys

接下来实现 Keys 方法,用于获取所有的键。我们可以使用 Range 方法来遍历所有的键值对,然后将键添加到一个切片中,最后返回这个切片。

datastruct/dict/sync_dict.go
// Keys returns a slice of all keys in the dictionary func (dict *SyncDict) Keys() []string { keys := make([]string, dict.Len()) // Iterate over all key-value pairs and collect the keys dict.m.Range(func(key, value interface{}) bool { keys = append(keys, key.(string)) return true }) return keys }

RandomKeys

实现 RandomKeys 方法,用于获取 n 个随机键,返回一个字符串切片。

我们可以利用 sync.MapRange 方法不保证迭代顺序的特性来实现这个方法。我们进行 n 次选择,每次随机选择一个键。由于 Range 方法不保证迭代顺序,所以我们可以在每次迭代时随机选择一个键。

但是这样可能会导致重复的键被返回,不过这个接口不做唯一性的保证,所以我们可以直接返回结果。

datastruct/dict/sync_dict.go
// RandomKeys returns a slice of n random keys from the dictionary // Due to m.Range doesn't guarantee the order of iteration, we can use this feature to get random keys // Note: This method may not be truly random, but it will give different keys each time // Duplicate keys may be returned func (dict *SyncDict) RandomKeys(n int) []string { keys := make([]string, dict.Len()) for i := 0; i < n; i++ { // Randomly select a key from the dictionary dict.m.Range(func(key, value interface{}) bool { keys = append(keys, key.(string)) return false }) } return keys }

RandomDistinctKeys

在这里我们使用 Range 方法来遍历所有的键值对。设置一个计数器 i 来统计已经收集到的键的数量。当收集到 n 个键时,停止迭代。我们使用一个切片来存储收集到的键。

datastruct/dict/sync_dict.go
// RandomDistinctKeys returns a slice of n distinct random keys from the dictionary func (dict *SyncDict) RandomDistinctKeys(n int) []string { result := make([]string, dict.Len()) i := 0 // Iterate over all key-value pairs and collect the keys dict.m.Range(func(key, value interface{}) bool { result[i] = key.(string) i++ return i != n }) return result }

Clear

在这里我们使用 *dict = *MakeSyncDict() 来清空所有的键值对。MakeSyncDict 函数会返回一个新的 SyncDict 实例。我们将当前的 dict 指向这个新的实例,从而实现清空所有的键值对。

datastruct/dict/sync_dict.go
// Clear clears all key-value pairs in the dictionary func (dict *SyncDict) Clear() { *dict = *MakeSyncDict() }

旧的去哪里了呢?在 Go 中,垃圾回收器会自动回收不再使用的内存。所以我们不需要手动去清理旧的实例。只要没有其他的引用指向这个实例,它就会被自动回收。

DB

创建 DB 结构体

刚刚我们实现的这个 SyncDict 是在 Redis 最底层工作的数据结构。接下来我们需要实现一个 DB 结构体,用于表示 Redis 数据库。这个结构体会包含一个 SyncDict 实例,用于存储数据。

我们在之前创建的 database 目录下创建一个 db.go 文件,用于存放 DB 结构体的实现。

database/db.go
type DB struct { index int data dict.Dict } // MakeDB creates a new DB instance func MakeDB() *DB { return &DB{ index: 0, data: dict.MakeSyncDict(), } }

在这里我们定义了一个 DB 结构体,它包含一个 index 字段和一个 data 字段。index 字段用于表示数据库的索引,data 字段用于存储数据。我们使用 MakeDB 函数来创建一个新的 DB 实例。

ExecFunc 和 CmdLine

接下来,我们需要实现一个 ExecFunc 函数类型,用于表示一个执行函数。这个函数类型接收一个 DB 实例和一个字节切片作为参数,返回一个 resp.Reply。所有的 Redis 命令(如 PING、SET、GET 等)都要用这个函数类型来实现。

这样有助于所有命令处理函数共享相同的签名,便于上层调用代码统一处理各种命令。

database/db.go
// ExecFunc is a function type that takes a DB instance and a slice of byte slices as arguments and returns a resp.Reply // All redis commands like PING, SET, GET, etc. are implemented as functions of this type type ExecFunc func(db *DB, args [][]byte) resp.Reply

然后我们需要实现一个 CmdLine 类型,用于表示命令行参数。这个类型是一个字节切片的切片,用于表示命令行参数。我们在这里使用 CmdLine 来表示命令行参数,而不是直接使用 [][]byte,是为了更好地表达语义。

database/db.go
// CmdLine is a type alias for a slice of byte slices // It is used to represent the command line arguments passed to the ExecFunc type CmdLine = [][]byte

为什么这里使用 [][]byte 而不是 string 呢?因为 Redis 的命令行参数是二进制安全的。也就是说,命令行参数可以包含任意的字节序列,包括空字节和非 UTF-8 编码的字节序列。[]byte 可以安全地存储任何二进制数据,包括包含零值(null字符)的数据。而 string 在某些场景下可能会被截断或错误地解析二进制数据。

接下来我们应当实现一个 Exec 方法,用于执行命令。这个方法接收一个 CmdLine 和一个 ExecFunc,返回一个 resp.Reply

这样的话我们需要在 Exec 方法中分辨命令类型,例如 PING、SET、GET 等等。但是这样的设计模式会导致代码中出现大量的 if-else 语句,导致代码的可读性和可维护性下降。

使用命令模式

所以我们可以考虑使用命令模式,在 database 目录下创建一个 command.go 文件,用于存放命令的实现。我们将所有的命令都放在这个文件中。这样可以避免在 db.go 中出现大量的 if-else 语句,从而提高代码的可读性和可维护性。

创建一个中央注册表(cmdTable),用于存储所有支持的命令及其实现。command 结构包含:

  • exec:命令的实际执行函数(类型为 ExecFunc)
  • arity:命令所需的参数数量(用于验证命令参数是否正确)

创建一个 RegisterCommand 函数用于注册新命令,将命令名转为小写(使命令大小写不敏感)并存入表中。

这是经典的命令模式(Command Pattern)的实现,为整个 Redis 克隆提供了良好的可维护性和可扩展性。

database/command.go
package database import "strings" // cmdTable is a map that associates command names (as strings) with their corresponding command structures var cmdTable = make(map[string]*command) type command struct { exec ExecFunc // function to execute the command arity int // number of arguments required for the command } // RegisterCommand registers a command with the command table func RegisterCommand(name string, exec ExecFunc, arity int) { name = strings.ToLower(name) cmdTable[name] = &command{ exec: exec, arity: arity, } }
💡

命令模式

命令模式(Command Pattern)是一种行为型设计模式,它的核心思想是:“将请求封装成对象,使你可以用不同的请求对客户进行参数化。”

也就是说,你可以把对某个操作的调用、参数、接收者(执行者)等打包成一个对象,然后延迟执行、存储、排队、撤销等——就像在做命令行操作一样。

想象你在点外卖:

  • 你(客户端)把要点的东西告诉服务员(调用者)

  • 服务员把你的订单写在纸上(封装成命令对象)

  • 后厨(接收者)根据订单来做菜(执行命令)

命令对象起到中间层的作用,把“发出命令的人”与“执行命令的人”解耦了。

一个例子:

type Command interface { Execute() } type Light struct{} func (l *Light) On() { fmt.Println("Light is ON") } func (l *Light) Off() { fmt.Println("Light is OFF") } type LightOnCommand struct { light *Light } func (c *LightOnCommand) Execute() { c.light.On() } type LightOffCommand struct { light *Light } func (c *LightOffCommand) Execute() { c.light.Off() } type RemoteControl struct { command Command } func (r *RemoteControl) SetCommand(command Command) { r.command = command } func (r *RemoteControl) PressButton() { r.command.Execute() } func main() { light := &Light{} lightOn := &LightOnCommand{light} lightOff := &LightOffCommand{light} remote := &RemoteControl{} remote.SetCommand(lightOn) remote.PressButton() // Output: Light is ON remote.SetCommand(lightOff) remote.PressButton() // Output: Light is OFF }

这个例子中,Command 接口定义了一个 Execute 方法,Light 类表示一个灯,LightOnCommandLightOffCommand 分别表示打开和关闭灯的命令。RemoteControl 类用于设置命令并执行它们。 这样,我们就可以通过命令对象来控制灯的开关,而不需要直接操作灯的对象。命令模式使得请求的发送者和接收者解耦,提高了代码的可扩展性和可维护性。

命令调用函数

上面的 cmdTable 有待补充,我们先我们回到 db.go 文件中,实现一个 Exec 方法,用于执行命令。主要作用是处理和执行客户端发来的命令。

  • 根据命令名称从 cmdTable 中获取对应的命令结构体
  • 检查命令参数的数量是否符合要求
  • 调用命令的执行函数,并返回执行结果

这里我们需要把 cmdLine 的第一个元素转换为小写字母,以确保命令名称的可识别性。然后我们从 cmdTable 中获取对应的命令结构体。如果命令不存在,则返回一个错误回复。

有的命令的参数是可变的,我们将可变参数的命令的 arity 设置为负数。比如 DEL 命令可以删除多个键,所以它的 arity 是 -1。我们在这里使用 ValidateArity 函数来验证命令参数的数量是否符合要求。

  • 对于像 SET key value 这样需要固定 2 个参数的命令,arity 为正数 2
  • 对于像 RPUSH key value1 [value2 value3 ...] 这样至少需要 2 个参数但可接受更多参数的命令,arity 为负数 -2
database/db.go
// Exec executes a command on the DB instance // It takes a connection and a command line as arguments // It returns a resp.Reply which is the response to the command func (db *DB) Exec(c resp.Connection, cmdLine CmdLine) resp.Reply { // The first element of cmdLine is the command name, like "PING", "SET", etc. // Convert it to lowercase to ensure case-insensitivity cmdName := strings.ToLower(string(cmdLine[0])) // Get the command from the command table using the command name // If the command is not found, return an error reply cmd, ok := cmdTable[cmdName] if !ok { return reply.MakeStandardErrorReply("ERR unknown command '" + cmdName + "'") } // Validate the number of arguments passed to the command if !ValidateArity(cmd.arity, cmdLine) { return reply.MakeArgNumErrReply(cmdName) } // Execute the command and return the response return cmd.exec(db, cmdLine[1:]) } // ValidateArity checks if the number of arguments passed to a command is valid func ValidateArity(arity int, args [][]byte) bool { // Check if the number of arguments is less than the required arity if arity >= 0 { return len(args) == arity } else { // If the arity is negative, it means the command takes a variable number of arguments // Check if the number of arguments is within the valid range return len(args) >= -arity } }

实现 PING 命令

接下来我们就开始实现 Redis 的命令。我们先从最简单的 PING 命令开始实现。PING 命令用于测试服务器是否在线。它的返回值是 PONG。

database 目录下创建一个 ping.go 文件,用于存放 PING 命令的实现。

我们在前面的章节中已经实现了一个 MakePongReply 函数,用于返回 PONG 回复。我们在这里直接使用这个函数来实现 PING 命令。

database/ping.go
package database import ( "redigo/interface/resp" "redigo/resp/reply" ) func Ping(db *DB, args [][]byte) resp.Reply { return reply.MakePongReply() }

然后我们可以在当前文件中使用 init 函数来注册 PING 命令。init 函数是 Go 语言中的一个特殊函数,它会在包被导入时自动执行。我们可以在这个函数中注册 PING 命令。

database/ping.go
// Register the PING command to the command table func init() { // Register the PING command with the command table RegisterCommand("ping", Ping, 1) }

数据库层面的操作

在开始实现之前,我们定义一些数据库层面的操作。

它们主要是对我们之前定义的底层 dict 的封装。

包含:

  • GetEntity:获取数据实体
  • PutEntity:存储数据实体
  • PutIfExists:如果存在则存储数据实体
  • PutIfAbsent:如果不存在则存储数据实体
  • Remove:删除数据实体
  • Removes:删除多个数据实体
  • Flush:清空数据库
database/db.go
// GetEntity returns DataEntity bind to the given key func (db *DB) GetEntity(key string) (*database.DataEntity, bool) { raw, ok := db.data.Get(key) if !ok { return nil, false } entity, _ := raw.(*database.DataEntity) return entity, true } // PutEntity stores the given DataEntity in the database func (db *DB) PutEntity(key string, entity *database.DataEntity) int { return db.data.Put(key, entity) } // PutIfExists edit the given DataEntity in the database func (db *DB) PutIfExists(key string, entity *database.DataEntity) int { return db.data.PutIfExists(key, entity) } // PutIfAbsent stores the given DataEntity in the database if it doesn't already exist func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int { return db.data.PutIfAbsent(key, entity) } // Remove deletes the DataEntity associated with the given key from the database func (db *DB) Remove(key string) int { return db.data.Remove(key) } // Removes deletes the DataEntity associated with the given keys from the database func (db *DB) Removes(keys ...string) int { deleted := 0 for _, key := range keys { _, ok := db.data.Get(key) if ok { db.data.Remove(key) deleted++ } } return deleted } // Flush clears the database by removing all DataEntity objects func (db *DB) Flush() { db.data.Clear() }

这里我们得把之前的 clear 方法改为 Clear,便于包外访问:

datastruct/dict/sync_dict.go
// Clear clears all key-value pairs in the dictionary func (dict *SyncDict) Clear() { *dict = *MakeSyncDict() }

datastruct/dict/dict.go 中也要把接口中的 clear 方法改为 Clear,便于包外访问:

datastruct/dict/dict.go
type Dict interface { // ... other methods Clear() // clear all key-value pairs }

实现 KEYS 命令集

接下来我们实现 KEYS 命令集。KEYS 命令用于查找所有符合给定模式的键。它的返回值是一个字符串切片,表示所有符合条件的键。

在 Redis 中,Keys 有下面的常见的一些指令:

  • DEL key1 key2 ...:删除一个或多个键
  • EXISTS key1 key2 ...:检查一个或多个键是否存在
  • FLUSHDB:删除当前数据库中的所有键
  • TYPE key:返回键的类型
  • RENAME key1 key2:重命名键
  • RENAMENX key1 key2:重命名键,如果新键已经存在,则不执行重命名

在这里我们先就实现这几个命令。我们在 database 目录下创建一个 keys.go 文件,用于存放 KEYS 命令的实现。

DEL

在 Redis 中,DEL 命令用于删除一个或多个键。它的返回值是被删除的键的数量。

示例:

假设我们有下面的键值对:

SET key1 value1 SET key2 value2 SET key3 value3

如果我们执行 DEL key1 key2 命令,则返回值为 2,因为我们删除了两个键。

我们在 keys.go 文件中实现 Del 函数,实现思路很简单,我们遍历所有的键,然后调用 db.Remove 方法来删除键。最后返回被删除的键的数量。

database/keys.go
// Handle the DEL command // It deletes the specified keys from the database func execDel(db *DB, args [][]byte) resp.Reply { keys := make([]string, len(args)) for i, arg := range args { keys[i] = string(arg) } deleted := db.Removes(keys...) return reply.MakeIntReply(int64(deleted)) }

然后我们在 keys.go 文件中使用 init 函数来注册 DEL 命令。DEL 接受的是可变参数,所以它的 arity 是 -2。

database/keys.go
func init() { RegisterCommand("DEL", execDel, -2) }

EXISTS

在 Redis 中,EXISTS 命令用于检查一个或多个键是否存在。它的返回值是一个整数,表示存在的键的数量。

使用示例:

假设还是上面的三个键值对,我们执行 EXISTS key1 key2 命令,则返回值为 2,因为 key1key2 都存在。

我们在 keys.go 文件中实现 Exists 函数,遍历所有的键,然后调用 db.GetEntity 方法来检查键是否存在。最后返回存在的键的数量。

database/keys.go
func execExists(db *DB, args [][]byte) resp.Reply { result := int64(0) for _, arg := range args { key := string(arg) if _, ok := db.GetEntity(key); ok { result++ } } return reply.MakeIntReply(result) }

然后我们在 keys.go 文件中使用 init 函数来注册 EXISTS 命令。EXISTS 接受的是可变参数,所以它的 arity 是 -2。

database/keys.go
func init() { // ... RegisterCommand("EXISTS", execExists, -2) }

FLUSHDB

在 Redis 中,FLUSHDB 命令用于删除当前数据库中的所有键。它的返回值是 OK。

这个命令不要求额外参数,所以它的 arity 是 1(即自身命令)。

database/keys.go
func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() return reply.MakeOKReply() } func init() { // ... RegisterCommand("FLUSHDB", execFlushDB, -1) }

TYPE

在 Redis 中,TYPE 命令用于返回键的类型。它的返回值是一个字符串,表示键的类型。

但是我们的项目到目前我们只去实现 String 类型,因此在这里我们只用返回 string 即可。

database/keys.go
// Handle the TYPE command // It returns the type of the specified key func execType(db *DB, args [][]byte) resp.Reply { key := string(args[0]) if entity, ok := db.GetEntity(key); ok { switch entity.Data.(type) { // If the entity is []byte, return the type as "string" case []byte: return reply.MakeBulkReply([]byte("string")) } // TODO: Add more types as needed } else { return reply.MakeStatusReply("none") } return reply.MakeUnknownReply() } func init() { // ... RegisterCommand("TYPE", execType, 2) }

RENAME

在 Redis 中,RENAME 命令用于重命名键。它的返回值是 OK。

RENAME key1 key2

这会将 key1 重命名为 key2。如果 key2 已经存在,则会覆盖它

database/keys.go
// Handle the RENAME command. // It renames a key in the database. // RENAME key newkey func execRename(db *DB, args [][]byte) resp.Reply { src := string(args[0]) dst := string(args[1]) entity, ok := db.GetEntity(src) if !ok { return reply.MakeStandardErrorReply("ERR no such key") } db.PutEntity(dst, entity) db.Remove(src) return reply.MakeOKReply() } func init() { // ... RegisterCommand("RENAME", execRename, 3) }

RENAMENX

上面实现的 RENAME 命令,假设在重命名时,key2 已经存在,则会覆盖 Key2 原有的值。

假如希望在重命名时,如果 key2 已经存在,则不执行重命名操作。我们可以使用 RENAMENX 命令。

其中 NX 表示 not exist,即如果不存在则执行重命名操作。

RENAMENX key1 key2

要注意的是,这个命令的返回值是 1 或 0。1 表示重命名成功,0 表示重命名失败。这是与 RENAME 命令有区别的。

实现:

database/keys.go
// Handle the RENAMENX command. // It renames a key in the database only if the new key does not exist. // RENAME key newkey func execRenameNX(db *DB, args [][]byte) resp.Reply { src := string(args[0]) dst := string(args[1]) entity, ok := db.GetEntity(src) if !ok { return reply.MakeStandardErrorReply("ERR no such key") } if _, ok := db.GetEntity(dst); ok { return reply.MakeIntReply(0) } db.PutEntity(dst, entity) db.Remove(src) return reply.MakeIntReply(1) } func init() { // ... RegisterCommand("RENAMENX", execRenameNX, 3) }

KEYS

在 Redis 中,KEYS 命令用于查找所有符合给定模式的键。它的返回值是一个字符串切片,表示所有符合条件的键。

使用示例:

KEYS *

表示返回所有的键。

或者使用通配符,例如 KEYS user:* 来查找所有以 user: 开头的键。

所以这里我们需要实现一个识别通配符的函数。

Redis 支持哪些通配符?在 Redis 中,KEYS pattern 命令支持一套通配符语法:

  • *:匹配任意长度的任意字符(包括空字符)
  • ?:匹配任意单个字符
  • [abc]:匹配集合中的任意一个字符
  • [a-z]:匹配范围内的任意一个字符
  • [^abc]:不匹配集合中的任意一个字符
  • 可以通过 \ 进行字符转义

举例:

  • user:* 匹配所有以 user: 开头的 key
  • session:?? 匹配如 session:ab, session:xy
  • file[0-9].log 匹配如 file1.log, file9.log
  • file[^0-9].log 表示不匹配任何以数字结尾的文件名,例如 fileA.log, fileB.log

为了匹配字符串,我们需要实现一个通配符匹配算法。我们可以使用动态规划来实现这个算法。

首先我们以 user:ab 为例,判断字符串是否符合给定的通配符模式。

第一步:CompilePattern("user:??") 即将通配符编译成一个模式对象。这个模式对象包含了所有的通配符信息。

这个通配符会被编译为下表:

索引类型内容
0normalu
1normals
2normale
3normalr
4normal:
5any?
6any?

第二步:IsMatch("user:ab") 即判断字符串是否符合给定的通配符模式。我们使用动态规划来实现这个算法。

定义一个布尔二维表:table[i][j] 表示当前的遍历到的数据 s[:i]pattern[:j] 是否匹配。

一格格开始填表:

i(字符串索引)j(模式索引)当前字符模式字符是否匹配原因
00uu字符相同
11ss字符相同
22ee字符相同
33rr字符相同
44::字符相同
55a?匹配任意单个字符
66b?匹配任意单个字符

最后值都是 true,所以返回 true

这个不用太纠结,直接使用即可。完整的代码,我们在 lib 根文件夹下创建 wildcard 包,然后在 wildcard 包下创建一个 wildcard.go 文件,用于存放通配符的实现。

lib/wildcard/wildcard.go
package wildcard const ( normal = iota all // * any // ? setSymbol // [] rangSymbol // [a-b] negSymbol // [^a] ) type item struct { character byte set map[byte]bool typeCode int } func (i *item) contains(c byte) bool { if i.typeCode == setSymbol { _, ok := i.set[c] return ok } else if i.typeCode == rangSymbol { if _, ok := i.set[c]; ok { return true } var ( min uint8 = 255 max uint8 = 0 ) for k := range i.set { if min > k { min = k } if max < k { max = k } } return c >= min && c <= max } else { _, ok := i.set[c] return !ok } } // Pattern represents a wildcard pattern type Pattern struct { items []*item } // CompilePattern convert wildcard string to Pattern func CompilePattern(src string) *Pattern { items := make([]*item, 0) escape := false inSet := false var set map[byte]bool for _, v := range src { c := byte(v) if escape { items = append(items, &item{typeCode: normal, character: c}) escape = false } else if c == '*' { items = append(items, &item{typeCode: all}) } else if c == '?' { items = append(items, &item{typeCode: any}) } else if c == '\\' { escape = true } else if c == '[' { if !inSet { inSet = true set = make(map[byte]bool) } else { set[c] = true } } else if c == ']' { if inSet { inSet = false typeCode := setSymbol if _, ok := set['-']; ok { typeCode = rangSymbol delete(set, '-') } if _, ok := set['^']; ok { typeCode = negSymbol delete(set, '^') } items = append(items, &item{typeCode: typeCode, set: set}) } else { items = append(items, &item{typeCode: normal, character: c}) } } else { if inSet { set[c] = true } else { items = append(items, &item{typeCode: normal, character: c}) } } } return &Pattern{ items: items, } } // IsMatch returns whether the given string matches pattern func (p *Pattern) IsMatch(s string) bool { if len(p.items) == 0 { return len(s) == 0 } m := len(s) n := len(p.items) table := make([][]bool, m+1) for i := 0; i < m+1; i++ { table[i] = make([]bool, n+1) } table[0][0] = true for j := 1; j < n+1; j++ { table[0][j] = table[0][j-1] && p.items[j-1].typeCode == all } for i := 1; i < m+1; i++ { for j := 1; j < n+1; j++ { if p.items[j-1].typeCode == all { table[i][j] = table[i-1][j] || table[i][j-1] } else { table[i][j] = table[i-1][j-1] && (p.items[j-1].typeCode == any || (p.items[j-1].typeCode == normal && uint8(s[i-1]) == p.items[j-1].character) || (p.items[j-1].typeCode >= setSymbol && p.items[j-1].contains(s[i-1]))) } } } return table[m][n] }

让我们回到 keys.go 文件中,来实现 KEYS 命令。

database/keys.go
// Handle the KEYS command. // It returns all keys in the database that match the specified pattern. func execKeys(db *DB, args [][]byte) resp.Reply { pattern := wildcard.CompilePattern(string(args[0])) result := make([][]byte, 0) // Store all matching keys db.data.ForEach(func(key string, val interface{}) bool { if pattern.IsMatch(key) { result = append(result, []byte(key)) } return true }) return reply.MakeMultiBulkReply(result) } func init() { // ... RegisterCommand("KEYS", execKeys, 2) }

重要勘误

在之前我们声明 ForEach 的时候,对于接受的条件函数 Consumer,我们忘记了添加返回值。我们需要在 datastruct/dict/dict.go 中把 Consumer 的返回值改为 bool,表示是否继续遍历。

datastruct/dict/dict.go
type Consumer func(key string, val interface{}) bool // function type for iterating over key-value pairs

实现 Strings 命令集

刚刚我们实现完了 KEYS 命令集,接下来我们实现 Strings 命令集。Strings 命令用于操作字符串类型的键值对。

我们这里挑选一些比较经典的命令来实现:

  • SET key value:设置键值对
  • GET key:获取键值对
  • SETNX key value:设置键值对,如果键不存在
  • GETSET key value:设置键值对,并返回旧值
  • STRLEN key:获取字符串的长度

database 目录下创建一个 strings.go 文件,用于存放 Strings 命令的实现。

GET

在 Redis 中,GET 命令用于获取键值对。它的返回值是一个字符串,表示键的值。

database/strings.go
// execGet retrieves the value associated with the specified key from the database. func execGet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) if entity, ok := db.GetEntity(key); ok { // TODO: If we have multiple types, we need to check the conversion if it's not []byte return reply.MakeBulkReply(entity.Data.([]byte)) } return reply.MakeNullBulkReply() } func init() { // ... RegisterCommand("GET", execGet, 2) }

需要注意的是,由于我们只实现了 String 类型,所以我们在这里直接返回 entity.Data.([]byte) 即可。如果我们有多个类型,我们需要在这里判断类型转换是否成功。 如果转换失败,我们需要返回一个错误回复。

SET

在 Redis 中,SET 命令用于设置键值对。它的返回值是 OK。

database/strings.go
// execSet stores the specified key-value pair in the database. func execSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity := &database.DataEntity{ Data: value, } db.PutEntity(key, entity) return reply.MakeOKReply() } func init() { // ... RegisterCommand("SET", execSet, 3) }

SETNX

在 Redis 中,SETNX 命令用于设置键值对,如果键不存在。它的返回值是 1 或 0。1 表示设置成功,0 表示设置失败。

database/strings.go
// execSetNX stores the specified key-value pair in the database only if the key does not already exist. // If the key already exists, it does not modify the value and returns 0. // If the key does not exist, it sets the value and returns 1. // SETNX key value func execSetNX(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity := &database.DataEntity{ Data: value, } result := db.PutIfAbsent(key, entity) return reply.MakeIntReply(int64(result)) } func init() { // ... RegisterCommand("SETNX", execSetNX, 3) }

GETSET

在 Redis 中,GETSET 命令用于设置键值对,并返回旧值。它的返回值是一个字符串,表示旧值。

database/strings.go
// execGetSet stores the specified key-value pair in the database and returns the old value associated with the key. func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, ok := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{ Data: value, }) if !ok { return reply.MakeNullBulkReply() } return reply.MakeBulkReply(entity.Data.([]byte)) } func init() { // ... RegisterCommand("GETSET", execGetSet, 3) }

STRLEN

在 Redis 中,STRLEN 命令用于获取字符串的长度。它的返回值是一个整数,表示字符串的长度。

STRLEN key
database/strings.go
// execStrLen retrieves the length of the value associated with the specified key. func execStrLen(db *DB, args [][]byte) resp.Reply { key := string(args[0]) entity, ok := db.GetEntity(key) if !ok { return reply.MakeNullBulkReply() } return reply.MakeIntReply(int64(len(entity.Data.([]byte)))) } func init() { // ... RegisterCommand("STRLEN", execStrLen, 2) }

到这里我们就实现完了 Strings 命令集。我们可以在 main.go 中测试一下。

实现核心 database

让我们回到 main.go 文件中。

main.go
func main() { // ... err := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) } }

我们创建了一个 handler 用于处理 TCP 连接:

handler/handler.go
// MakeHandler creates a RespHandler instance func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } }

而在这里,我们仍旧使用的是之前创建的回显服务器,并没有真正使用到数据库业务。

现在我们需要去实现核心的 database 业务逻辑。

database 目录下创建一个 database.go 文件,用于存放数据库的实现。

每个数据库都由多个 DB 实例组成。我们在这里创建一个 Database 结构体,用于存放所有的 DB 实例。

database/database.go
package database type Database struct { dbSet []*DB }

接下来我们需要实现 database 的三个接口:

interface/database/database.go
type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() }

它们分别是:

  • Exec:执行命令
  • AfterClientClose:客户端关闭后执行
  • Close:关闭数据库

我们向 redis.conf 中添加一个 databases 配置项,用于指定数据库的数量。默认值为 16。

redis.conf
databases 16

然后创建 MakeDatabase 函数,用于创建数据库实例。这里我们需要根据配置文件中的 databases 配置项来创建数据库实例。

database/database.go
// NewDatabase creates a new Database instance func NewDatabase() *Database { database := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } database.dbSet = make([]*DB, config.Properties.Databases) for i := range database.dbSet { db := MakeDB() db.index = i database.dbSet[i] = db } return database }

由于一个数据库实例是多个 DB 实例的集合,所以我们需要给用户实现一个选择数据库的功能。在之前实现的我们的 RESP 协议连接中:

interface/resp/conn.go
type Connection interface { Write([]byte) error // Write data to the connection GetDBIndex() int // Get database index SelectDB(int) // Select database }

我们创建了一个 SelectDB 方法,用于选择数据库。然后我们在 RESP 中实现了它:

resp/connnection/connection.go
// SelectDB selects a database func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }

接下来我们就可以使用这个方法来实现数据库的选择功能。我们需要支持用户输入 SELECT <dbNum> 命令来选择数据库。

database.go 文件实现:

database/database.go
// execSelect sets the current database for the client connection. // select x func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply { dbIndex, err := strconv.Atoi(string(args[0])) if err != nil { return reply.MakeStandardErrorReply("ERR invalid DB index") } if dbIndex < 0 || dbIndex >= len(database.dbSet) { return reply.MakeStandardErrorReply("ERR DB index out of range") } c.SelectDB(dbIndex) return reply.MakeIntReply(int64(dbIndex)) }

这里有几个注意的点,一个是我们需要把 dbNum 转换为整数,另一个是我们需要判断 dbNum 是否在范围内。

接下来我们要实现 Exec 方法,这个方法是我们整个服务器的入口,所有的命令都会经过这个方法,然后执行对应的命令。

database/database.go
// Exec executes a command on the database func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply { defer func() { if err := recover(); err != nil { logger.Error("Database Exec panic:" + err.(error).Error()) } }() cmdName := strings.ToLower(string(args[0])) if cmdName == "select" { if len(args) != 2 { return reply.MakeArgNumErrReply("select") } return execSelect(client, d, args[1:]) } // Get the current database index from the client connection db := d.dbSet[client.GetDBIndex()] return db.Exec(client, args) }

在这里我们需要注意的是,我们需要在 Exec 方法中判断命令是否是 SELECT 命令,如果是的话,我们就执行 execSelect 方法来选择数据库。

然后取出当前数据库的索引,执行对应的命令。

接下来我们要到 resp/handler/handler.go 文件中修改 MakeHandler 方法,来使用我们刚刚实现的 database。使用 database.NewDatabase() 来创建一个新的数据库实例。代替原先的回显服务器 database.NewEchoDatabase()

handler/handler.go
func MakeHandler() *RespHandler { db := database.NewDatabase() return &RespHandler{ db: db, } }

到目前为止,我们来回顾一下 Redigo 执行的流程。

  1. 客户端请求解析

    • 客户端通过 TCP 连接发送命令,服务器使用 RESP 协议解析器解析请求。
    • ParseStream 函数异步解析客户端的命令流,将其转换为 Payload 对象。
  2. 命令分发与验证

    • 解析后的命令通过 RespHandler 分发到数据库层。
    • 数据库层的 Exec 方法根据命令名称从 cmdTable 中查找对应的命令结构体。
    • 验证命令参数数量是否符合要求,若不符合则返回错误回复。
  3. 命令执行

    • 数据库层调用对应的命令执行函数(如 execGetexecSet 等)。
    • 执行函数通过 DB 结构体操作底层的 SyncDict 数据结构,完成数据的增删改查。
  4. 响应生成

    • 命令执行完成后,返回一个 resp.Reply 对象。
    • 服务器将 Reply 对象序列化为 RESP 格式并发送回客户端。
  5. 多数据库支持

    • 客户端可以通过 SELECT 命令切换数据库。
    • 每个客户端连接维护一个 selectedDB 字段,指向当前使用的数据库实例。

通过以上流程,Redigo 实现了一个简化版的 Redis 服务器,支持基本的键值操作和多数据库管理。

从下一章节开始,我们要实现持久化功能。

测试

启动服务器:

go run main.go

新开一个命令行,执行命令:

printf "*1\r\n\$4\r\nPING\r\n" | nc localhost 6379

返回:

+PONG

执行:

printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379

这是一条 SET 命令,设置键值对 keyvalue

返回:

+OK

然后我们试着获取一下:

printf "*2\r\n\$3\r\nGET\r\n\$3\r\nkey\r\n" | nc localhost 6379

返回:

$5 value

我们可以看到,返回的值是 value,说明我们成功地设置了键值对。

到目前为止,数据存在了内存中的 dict 中。我们可以使用 KEYS 命令来查看所有的键:

printf "*2\r\n\$4\r\nKEYS\r\n\$1\r\n*\r\n" | nc localhost 6379

返回:

*1 $3 key

提交到 GitHub

将代码提交到 GitHub:

git add . # 示例:可以自己写一下 git commit -m "feat: implement in-memory database - Create Dict interface and SyncDict implementation - Add database structure with multi-DB support - Implement command pattern for Redis commands - Add basic Redis commands (PING, SET, GET, DEL, etc.) - Implement pattern matching for KEYS command - Support string operations like GETSET and STRLEN" git push origin main
Last updated on