Redis 内存数据库
本文进度对应的代码仓库:Redis 内存数据库
在前面的章节中,我们先是实现了一个简单的 TCP 服务器,然后又实现了一个简单的 RESP 解析器。 在这一章中,我们将实现一个 Redis 内存数据库。
创建接口
首先我们要创建一个 Dict
接口,表示一个字典。Redis 中的 Dict 结构提供了对键值对的基本操作(如获取、添加、删除等)。以及一些高级功能,如迭代器、随机返回键等。
使用接口来抽象化 Dict 的实现,可以让我们在后续的实现中更容易地替换底层的数据结构。而不用影响到上层的逻辑。
例如下面我们将使用 sync.Map
来实现 Dict。但是未来我们可能会使用其他的数据结构来实现 Dict,比如 hashmap
、skiplist
等等。使用接口可以让我们在不修改上层逻辑的情况下,轻松地替换底层的数据结构。
首先在根目录下创建一个 datastruct
目录,专门用来存放数据结构的实现。然后在 datastruct
目录下创建一个 dict
目录,专门用来存放 Dict 的实现。最后在 dict
目录下创建一个 dict.go
文件,专门用来存放 Dict 的接口定义。
package dict
type Consumer func(key string, val interface{}) bool // function type for iterating over key-value pairs
type Dict interface {
Get(key string) (val interface{}, exists bool) // get value by key, return the value and a boolean indicating if the key exists
Len() int // get the number of key-value pairs
Put(key string, val interface{}) (result int) // put key-value pair, if exists, modify the value, return 0, if doesn't exist, add it, return 1
PutIfAbsent(key string, val interface{}) (result int) // put key-value pair if absent, return 0, if exists, return 1
PutIfExists(key string, val interface{}) (result int) // put key-value pair if exists, return 0, if absent, return 1
Remove(key string) (result int) // remove key-value pair, return the count of pairs
ForEach(consumer Consumer) // iterate over all key-value pairs
Keys() []string // get all keys
RandomKeys(n int) []string // get n random keys
RandomDistinctKeys(n int) []string // get n distinct random keys
Clear() // clear all key-value pairs
}
在这里我们定义了 Redis 中数据结构常用的一些方法:
Get(key string) (val interface{}, exists bool)
:根据键获取值,如果键存在则返回值和 true,否则返回 nil 和 false。Len() int
:获取键值对的数量。Put(key string, val interface{}) (result int)
:添加键值对,如果键已经存在,则覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。也就是说如果键已经存在,则不添加键值对但是进行值的修改。PutIfAbsent(key string, val interface{}) (result int)
:添加键值对,如果键已经存在,则不覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。PutIfExists(key string, val interface{}) (result int)
:添加键值对,如果键不存在,则不添加键值对。也就是说仅仅在键存在时才添加键值对。Remove(key string) (result int)
:删除键值对,返回键值对的数量。ForEach(consumer Consumer)
:遍历所有的键值对,consumer 是一个函数类型,用来处理每一个键值对。它允许用户在遍历时自定义处理逻辑。
例如,我们可以在遍历时打印每一个键值对:
func printKeyValue(key string, val interface{}) {
fmt.Printf("key: %s, value: %v\n", key, val)
}
// 使用 ForEach 方法遍历所有的键值对
dict.ForEach(printKeyValue)
Keys() []string
:获取所有的键,返回一个字符串切片。RandomKeys(n int) []string
:获取 n 个随机键,返回一个字符串切片。RandomDistinctKeys(n int) []string
:获取 n 个不重复的随机键,返回一个字符串切片。Clear()
:清空所有的键值对。这是一个包内方法,不可被外部调用。
接下来我们就需要去分别完成 Dict 接口的具体实现。
实现 SyncDict
在 datastruct/dict
目录下创建一个 sync_dict.go
文件,用于实现一个线程安全的 Dict。我们使用 Go 的内置 sync.Map
来实现这个 Dict。sync.Map
是一个并发安全的 map,它提供了原子操作的方法,可以在多个 goroutine 中安全地使用。
sync.Map
是 Go 语言标准库中的一个并发安全的 map 实现。sync.Map
主要用于在高并发场景下共享数据,比如缓存、频繁读写的共享状态等。
Go 原生的 map
不是并发安全的,如果多个 goroutine
同时读写同一个 map,会导致 fatal error: concurrent map read and map write
的运行时错误。
所以我们需要使用 sync.Map
来实现一个线程安全的 Dict。
为什么 Redis 中需要一个线程安全的 Dict 呢?因为 Redis 是一个单线程的数据库,它使用事件驱动模型来处理请求。虽然 Redis 是单线程的,但是它的底层数据结构是可以被多个线程同时访问的。所以我们需要一个线程安全的 Dict 来保证数据的一致性和安全性。
sync.Map
的实现原理是使用了读写锁(sync.RWMutex
)来保护 map 的读写操作。它的读操作是并发安全的,而写操作是互斥的。这就保证了在高并发场景下,读操作不会阻塞其他的读操作,而写操作会阻塞其他的读写操作。
加下来我们开始对 Dict
接口进行实现。首先实现一个 MakeSyncDict
函数,用于创建一个新的 SyncDict
实例。这个函数会返回一个实现了 Dict
接口的 SyncDict
实例。
// MakeSyncDict creates a new SyncDict instance
func MakeSyncDict() *SyncDict {
return &SyncDict{}
}
Get
然后实现 Get
方法,用于根据键获取值。如果键存在,则返回值和 true,否则返回 nil 和 false。
// Get returns the value associated with the given key and a boolean indicating if the key exists
func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
// Get the value by key
if value, ok := dict.m.Load(key); ok {
return value, true
}
return nil, false
}
这个方法是在对 Dict
接口中的 Get 方法进行实现,而在 Go 中,鸭子类型告诉我们,只要一个类型实现了某个接口的方法,那么这个类型就可以被视为这个接口的实现。
由于我们直接使用了 sync.Map
,所以获取值方法实际上就是从 sync.Map
中获取值。我们使用 Load
方法来获取值。Load
方法会返回一个值和一个布尔值,表示键是否存在。
Len
接下来实现 Len
方法,用于获取键值对的数量。但是 sync.Map
并没有提供直接获取长度的方法。我们需要使用 Range
方法来遍历所有的键值对,然后统计数量。
// Len returns the number of key-value pairs in the dictionary
func (dict *SyncDict) Len() int {
count := 0
// Iterate over all key-value pairs and count them
dict.m.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
在这里我们使用 Range
方法来遍历所有的键值对。Range
方法会调用一个函数,传入每一个键值对。我们在这个函数中统计数量。
Put
接下来实现 Put
方法,用于添加键值对。如果键已经存在,则覆盖原有的值,返回 0;如果键不存在,则添加键值对,返回 1。也就是说如果键已经存在,则不添加键值对但是进行值的修改。
// Put adds a key-value pair to the dictionary, if the key already exists, return 1 else 0
func (dict *SyncDict) Put(key string, val interface{}) (result int) {
_, exists := dict.m.Load(key)
// Store the key-value pair
dict.m.Store(key, val)
// Return the count of pairs
if exists {
return 0
}
return 1
}
在这里我们使用 Store
方法来添加键值对。Store
方法会覆盖原有的值。我们只需要判断键是否存在,然后返回 0 或 1 即可。
为啥这里要返回 0 或 1 呢?
这种设计遵循了 Redis 的命令返回值约定。返回值告诉调用者该操作是更新了一个已存在的键(返回0)还是创建了一个新键(返回1)。这些返回值常用于统计操作影响的键数量,例如在批量操作中跟踪修改的总数。调用者可以根据返回值执行不同的后续操作。
PutIfAbsent 和 PutIfExists
接下来实现 PutIfAbsent
和 PutIfExists
方法。PutIfAbsent
方法用于仅在键不存在时添加键值对,PutIfExists
方法用于仅在键存在时添加键值对。我们可以使用 Load
方法来判断键是否存在。然后使用 Store
方法来添加键值对。
// PutIfAbsent adds a key-value pair to the dictionary if the key does not exist, return 1 if it exists, else 0
func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
_, exists := dict.m.Load(key)
if exists {
return 0
}
// Store the key-value pair
dict.m.Store(key, val)
return 1
}
// PutIfExists adds a key-value pair to the dictionary if the key exists, return 1 if it does not exist, else 0
func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
_, exists := dict.m.Load(key)
if !exists {
return 0
}
// Store the key-value pair
dict.m.Store(key, val)
return 1
}
Remove
接下来实现 Remove
方法,用于删除键值对。我们可以使用 Load
方法来判断键是否存在。然后使用 Delete
方法来删除键值对。
// Remove removes a key-value pair from the dictionary, return the count of pairs were removed
func (dict *SyncDict) Remove(key string) (result int) {
_, exists := dict.m.Load(key)
if !exists {
return 0
}
// Delete the key-value pair
dict.m.Delete(key)
return 1
}
ForEach
接下来实现 ForEach
方法,用于遍历所有的键值对。我们可以使用 sync.Map
的 Range
方法来遍历所有的键值对。我们在遍历时调用传入的函数来处理每一个键值对。
// ForEach iterates over all key-value pairs in the dictionary and applies the consumer function to each pair
func (dict *SyncDict) ForEach(consumer Consumer) {
// Iterate over all key-value pairs and apply the consumer function
dict.m.Range(func(key, value interface{}) bool {
consumer(key.(string), value)
// Always return true to continue iteration
return true
})
}
Keys
接下来实现 Keys
方法,用于获取所有的键。我们可以使用 Range
方法来遍历所有的键值对,然后将键添加到一个切片中,最后返回这个切片。
// Keys returns a slice of all keys in the dictionary
func (dict *SyncDict) Keys() []string {
keys := make([]string, dict.Len())
// Iterate over all key-value pairs and collect the keys
dict.m.Range(func(key, value interface{}) bool {
keys = append(keys, key.(string))
return true
})
return keys
}
RandomKeys
实现 RandomKeys
方法,用于获取 n 个随机键,返回一个字符串切片。
我们可以利用 sync.Map
的 Range
方法不保证迭代顺序的特性来实现这个方法。我们进行 n 次选择,每次随机选择一个键。由于 Range
方法不保证迭代顺序,所以我们可以在每次迭代时随机选择一个键。
但是这样可能会导致重复的键被返回,不过这个接口不做唯一性的保证,所以我们可以直接返回结果。
// RandomKeys returns a slice of n random keys from the dictionary
// Due to m.Range doesn't guarantee the order of iteration, we can use this feature to get random keys
// Note: This method may not be truly random, but it will give different keys each time
// Duplicate keys may be returned
func (dict *SyncDict) RandomKeys(n int) []string {
keys := make([]string, dict.Len())
for i := 0; i < n; i++ {
// Randomly select a key from the dictionary
dict.m.Range(func(key, value interface{}) bool {
keys = append(keys, key.(string))
return false
})
}
return keys
}
RandomDistinctKeys
在这里我们使用 Range
方法来遍历所有的键值对。设置一个计数器 i
来统计已经收集到的键的数量。当收集到 n 个键时,停止迭代。我们使用一个切片来存储收集到的键。
// RandomDistinctKeys returns a slice of n distinct random keys from the dictionary
func (dict *SyncDict) RandomDistinctKeys(n int) []string {
result := make([]string, dict.Len())
i := 0
// Iterate over all key-value pairs and collect the keys
dict.m.Range(func(key, value interface{}) bool {
result[i] = key.(string)
i++
return i != n
})
return result
}
Clear
在这里我们使用 *dict = *MakeSyncDict()
来清空所有的键值对。MakeSyncDict
函数会返回一个新的 SyncDict
实例。我们将当前的 dict
指向这个新的实例,从而实现清空所有的键值对。
// Clear clears all key-value pairs in the dictionary
func (dict *SyncDict) Clear() {
*dict = *MakeSyncDict()
}
旧的去哪里了呢?在 Go 中,垃圾回收器会自动回收不再使用的内存。所以我们不需要手动去清理旧的实例。只要没有其他的引用指向这个实例,它就会被自动回收。
DB
创建 DB 结构体
刚刚我们实现的这个 SyncDict
是在 Redis 最底层工作的数据结构。接下来我们需要实现一个 DB
结构体,用于表示 Redis 数据库。这个结构体会包含一个 SyncDict
实例,用于存储数据。
我们在之前创建的 database
目录下创建一个 db.go
文件,用于存放 DB
结构体的实现。
type DB struct {
index int
data dict.Dict
}
// MakeDB creates a new DB instance
func MakeDB() *DB {
return &DB{
index: 0,
data: dict.MakeSyncDict(),
}
}
在这里我们定义了一个 DB
结构体,它包含一个 index
字段和一个 data
字段。index
字段用于表示数据库的索引,data
字段用于存储数据。我们使用 MakeDB
函数来创建一个新的 DB
实例。
ExecFunc 和 CmdLine
接下来,我们需要实现一个 ExecFunc
函数类型,用于表示一个执行函数。这个函数类型接收一个 DB
实例和一个字节切片作为参数,返回一个 resp.Reply
。所有的 Redis 命令(如 PING、SET、GET 等)都要用这个函数类型来实现。
这样有助于所有命令处理函数共享相同的签名,便于上层调用代码统一处理各种命令。
// ExecFunc is a function type that takes a DB instance and a slice of byte slices as arguments and returns a resp.Reply
// All redis commands like PING, SET, GET, etc. are implemented as functions of this type
type ExecFunc func(db *DB, args [][]byte) resp.Reply
然后我们需要实现一个 CmdLine
类型,用于表示命令行参数。这个类型是一个字节切片的切片,用于表示命令行参数。我们在这里使用 CmdLine
来表示命令行参数,而不是直接使用 [][]byte
,是为了更好地表达语义。
// CmdLine is a type alias for a slice of byte slices
// It is used to represent the command line arguments passed to the ExecFunc
type CmdLine = [][]byte
为什么这里使用 [][]byte
而不是 string
呢?因为 Redis 的命令行参数是二进制安全的。也就是说,命令行参数可以包含任意的字节序列,包括空字节和非 UTF-8 编码的字节序列。[]byte 可以安全地存储任何二进制数据,包括包含零值(null字符)的数据。而 string 在某些场景下可能会被截断或错误地解析二进制数据。
接下来我们应当实现一个 Exec
方法,用于执行命令。这个方法接收一个 CmdLine
和一个 ExecFunc
,返回一个 resp.Reply
。
这样的话我们需要在 Exec
方法中分辨命令类型,例如 PING、SET、GET 等等。但是这样的设计模式会导致代码中出现大量的 if-else 语句,导致代码的可读性和可维护性下降。
使用命令模式
所以我们可以考虑使用命令模式,在 database
目录下创建一个 command.go
文件,用于存放命令的实现。我们将所有的命令都放在这个文件中。这样可以避免在 db.go
中出现大量的 if-else 语句,从而提高代码的可读性和可维护性。
创建一个中央注册表(cmdTable),用于存储所有支持的命令及其实现。command 结构包含:
exec
:命令的实际执行函数(类型为 ExecFunc)arity
:命令所需的参数数量(用于验证命令参数是否正确)
创建一个 RegisterCommand
函数用于注册新命令,将命令名转为小写(使命令大小写不敏感)并存入表中。
这是经典的命令模式(Command Pattern)的实现,为整个 Redis 克隆提供了良好的可维护性和可扩展性。
package database
import "strings"
// cmdTable is a map that associates command names (as strings) with their corresponding command structures
var cmdTable = make(map[string]*command)
type command struct {
exec ExecFunc // function to execute the command
arity int // number of arguments required for the command
}
// RegisterCommand registers a command with the command table
func RegisterCommand(name string, exec ExecFunc, arity int) {
name = strings.ToLower(name)
cmdTable[name] = &command{
exec: exec,
arity: arity,
}
}
命令模式
命令模式(Command Pattern)是一种行为型设计模式,它的核心思想是:“将请求封装成对象,使你可以用不同的请求对客户进行参数化。”
也就是说,你可以把对某个操作的调用、参数、接收者(执行者)等打包成一个对象,然后延迟执行、存储、排队、撤销等——就像在做命令行操作一样。
想象你在点外卖:
-
你(客户端)把要点的东西告诉服务员(调用者)
-
服务员把你的订单写在纸上(封装成命令对象)
-
后厨(接收者)根据订单来做菜(执行命令)
命令对象起到中间层的作用,把“发出命令的人”与“执行命令的人”解耦了。
一个例子:
type Command interface {
Execute()
}
type Light struct{}
func (l *Light) On() {
fmt.Println("Light is ON")
}
func (l *Light) Off() {
fmt.Println("Light is OFF")
}
type LightOnCommand struct {
light *Light
}
func (c *LightOnCommand) Execute() {
c.light.On()
}
type LightOffCommand struct {
light *Light
}
func (c *LightOffCommand) Execute() {
c.light.Off()
}
type RemoteControl struct {
command Command
}
func (r *RemoteControl) SetCommand(command Command) {
r.command = command
}
func (r *RemoteControl) PressButton() {
r.command.Execute()
}
func main() {
light := &Light{}
lightOn := &LightOnCommand{light}
lightOff := &LightOffCommand{light}
remote := &RemoteControl{}
remote.SetCommand(lightOn)
remote.PressButton() // Output: Light is ON
remote.SetCommand(lightOff)
remote.PressButton() // Output: Light is OFF
}
这个例子中,Command
接口定义了一个 Execute
方法,Light
类表示一个灯,LightOnCommand
和 LightOffCommand
分别表示打开和关闭灯的命令。RemoteControl
类用于设置命令并执行它们。
这样,我们就可以通过命令对象来控制灯的开关,而不需要直接操作灯的对象。命令模式使得请求的发送者和接收者解耦,提高了代码的可扩展性和可维护性。
命令调用函数
上面的 cmdTable
有待补充,我们先我们回到 db.go
文件中,实现一个 Exec
方法,用于执行命令。主要作用是处理和执行客户端发来的命令。
- 根据命令名称从
cmdTable
中获取对应的命令结构体 - 检查命令参数的数量是否符合要求
- 调用命令的执行函数,并返回执行结果
这里我们需要把 cmdLine
的第一个元素转换为小写字母,以确保命令名称的可识别性。然后我们从 cmdTable
中获取对应的命令结构体。如果命令不存在,则返回一个错误回复。
有的命令的参数是可变的,我们将可变参数的命令的 arity 设置为负数。比如 DEL
命令可以删除多个键,所以它的 arity 是 -1。我们在这里使用 ValidateArity
函数来验证命令参数的数量是否符合要求。
- 对于像
SET key value
这样需要固定 2 个参数的命令,arity 为正数 2 - 对于像
RPUSH key value1 [value2 value3 ...]
这样至少需要 2 个参数但可接受更多参数的命令,arity 为负数 -2
// Exec executes a command on the DB instance
// It takes a connection and a command line as arguments
// It returns a resp.Reply which is the response to the command
func (db *DB) Exec(c resp.Connection, cmdLine CmdLine) resp.Reply {
// The first element of cmdLine is the command name, like "PING", "SET", etc.
// Convert it to lowercase to ensure case-insensitivity
cmdName := strings.ToLower(string(cmdLine[0]))
// Get the command from the command table using the command name
// If the command is not found, return an error reply
cmd, ok := cmdTable[cmdName]
if !ok {
return reply.MakeStandardErrorReply("ERR unknown command '" + cmdName + "'")
}
// Validate the number of arguments passed to the command
if !ValidateArity(cmd.arity, cmdLine) {
return reply.MakeArgNumErrReply(cmdName)
}
// Execute the command and return the response
return cmd.exec(db, cmdLine[1:])
}
// ValidateArity checks if the number of arguments passed to a command is valid
func ValidateArity(arity int, args [][]byte) bool {
// Check if the number of arguments is less than the required arity
if arity >= 0 {
return len(args) == arity
} else {
// If the arity is negative, it means the command takes a variable number of arguments
// Check if the number of arguments is within the valid range
return len(args) >= -arity
}
}
实现 PING 命令
接下来我们就开始实现 Redis 的命令。我们先从最简单的 PING 命令开始实现。PING 命令用于测试服务器是否在线。它的返回值是 PONG。
在 database
目录下创建一个 ping.go
文件,用于存放 PING 命令的实现。
我们在前面的章节中已经实现了一个 MakePongReply
函数,用于返回 PONG 回复。我们在这里直接使用这个函数来实现 PING 命令。
package database
import (
"redigo/interface/resp"
"redigo/resp/reply"
)
func Ping(db *DB, args [][]byte) resp.Reply {
return reply.MakePongReply()
}
然后我们可以在当前文件中使用 init
函数来注册 PING 命令。init
函数是 Go 语言中的一个特殊函数,它会在包被导入时自动执行。我们可以在这个函数中注册 PING 命令。
// Register the PING command to the command table
func init() {
// Register the PING command with the command table
RegisterCommand("ping", Ping, 1)
}
数据库层面的操作
在开始实现之前,我们定义一些数据库层面的操作。
它们主要是对我们之前定义的底层 dict
的封装。
包含:
GetEntity
:获取数据实体PutEntity
:存储数据实体PutIfExists
:如果存在则存储数据实体PutIfAbsent
:如果不存在则存储数据实体Remove
:删除数据实体Removes
:删除多个数据实体Flush
:清空数据库
// GetEntity returns DataEntity bind to the given key
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
raw, ok := db.data.Get(key)
if !ok {
return nil, false
}
entity, _ := raw.(*database.DataEntity)
return entity, true
}
// PutEntity stores the given DataEntity in the database
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
return db.data.Put(key, entity)
}
// PutIfExists edit the given DataEntity in the database
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
return db.data.PutIfExists(key, entity)
}
// PutIfAbsent stores the given DataEntity in the database if it doesn't already exist
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
return db.data.PutIfAbsent(key, entity)
}
// Remove deletes the DataEntity associated with the given key from the database
func (db *DB) Remove(key string) int {
return db.data.Remove(key)
}
// Removes deletes the DataEntity associated with the given keys from the database
func (db *DB) Removes(keys ...string) int {
deleted := 0
for _, key := range keys {
_, ok := db.data.Get(key)
if ok {
db.data.Remove(key)
deleted++
}
}
return deleted
}
// Flush clears the database by removing all DataEntity objects
func (db *DB) Flush() {
db.data.Clear()
}
这里我们得把之前的 clear
方法改为 Clear
,便于包外访问:
// Clear clears all key-value pairs in the dictionary
func (dict *SyncDict) Clear() {
*dict = *MakeSyncDict()
}
在 datastruct/dict/dict.go
中也要把接口中的 clear
方法改为 Clear
,便于包外访问:
type Dict interface {
// ... other methods
Clear() // clear all key-value pairs
}
实现 KEYS 命令集
接下来我们实现 KEYS
命令集。KEYS
命令用于查找所有符合给定模式的键。它的返回值是一个字符串切片,表示所有符合条件的键。
在 Redis 中,Keys 有下面的常见的一些指令:
DEL key1 key2 ...
:删除一个或多个键EXISTS key1 key2 ...
:检查一个或多个键是否存在FLUSHDB
:删除当前数据库中的所有键TYPE key
:返回键的类型RENAME key1 key2
:重命名键RENAMENX key1 key2
:重命名键,如果新键已经存在,则不执行重命名
在这里我们先就实现这几个命令。我们在 database
目录下创建一个 keys.go
文件,用于存放 KEYS
命令的实现。
DEL
在 Redis 中,DEL
命令用于删除一个或多个键。它的返回值是被删除的键的数量。
示例:
假设我们有下面的键值对:
SET key1 value1
SET key2 value2
SET key3 value3
如果我们执行 DEL key1 key2
命令,则返回值为 2,因为我们删除了两个键。
我们在 keys.go
文件中实现 Del
函数,实现思路很简单,我们遍历所有的键,然后调用 db.Remove
方法来删除键。最后返回被删除的键的数量。
// Handle the DEL command
// It deletes the specified keys from the database
func execDel(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
for i, arg := range args {
keys[i] = string(arg)
}
deleted := db.Removes(keys...)
return reply.MakeIntReply(int64(deleted))
}
然后我们在 keys.go
文件中使用 init
函数来注册 DEL
命令。DEL
接受的是可变参数,所以它的 arity 是 -2。
func init() {
RegisterCommand("DEL", execDel, -2)
}
EXISTS
在 Redis 中,EXISTS
命令用于检查一个或多个键是否存在。它的返回值是一个整数,表示存在的键的数量。
使用示例:
假设还是上面的三个键值对,我们执行 EXISTS key1 key2
命令,则返回值为 2,因为 key1
和 key2
都存在。
我们在 keys.go
文件中实现 Exists
函数,遍历所有的键,然后调用 db.GetEntity
方法来检查键是否存在。最后返回存在的键的数量。
func execExists(db *DB, args [][]byte) resp.Reply {
result := int64(0)
for _, arg := range args {
key := string(arg)
if _, ok := db.GetEntity(key); ok {
result++
}
}
return reply.MakeIntReply(result)
}
然后我们在 keys.go
文件中使用 init
函数来注册 EXISTS
命令。EXISTS
接受的是可变参数,所以它的 arity 是 -2。
func init() {
// ...
RegisterCommand("EXISTS", execExists, -2)
}
FLUSHDB
在 Redis 中,FLUSHDB
命令用于删除当前数据库中的所有键。它的返回值是 OK。
这个命令不要求额外参数,所以它的 arity 是 1(即自身命令)。
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
return reply.MakeOKReply()
}
func init() {
// ...
RegisterCommand("FLUSHDB", execFlushDB, -1)
}
TYPE
在 Redis 中,TYPE
命令用于返回键的类型。它的返回值是一个字符串,表示键的类型。
但是我们的项目到目前我们只去实现 String 类型,因此在这里我们只用返回 string
即可。
// Handle the TYPE command
// It returns the type of the specified key
func execType(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
if entity, ok := db.GetEntity(key); ok {
switch entity.Data.(type) {
// If the entity is []byte, return the type as "string"
case []byte:
return reply.MakeBulkReply([]byte("string"))
}
// TODO: Add more types as needed
} else {
return reply.MakeStatusReply("none")
}
return reply.MakeUnknownReply()
}
func init() {
// ...
RegisterCommand("TYPE", execType, 2)
}
RENAME
在 Redis 中,RENAME
命令用于重命名键。它的返回值是 OK。
RENAME key1 key2
这会将 key1
重命名为 key2
。如果 key2
已经存在,则会覆盖它。
// Handle the RENAME command.
// It renames a key in the database.
// RENAME key newkey
func execRename(db *DB, args [][]byte) resp.Reply {
src := string(args[0])
dst := string(args[1])
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeStandardErrorReply("ERR no such key")
}
db.PutEntity(dst, entity)
db.Remove(src)
return reply.MakeOKReply()
}
func init() {
// ...
RegisterCommand("RENAME", execRename, 3)
}
RENAMENX
上面实现的 RENAME
命令,假设在重命名时,key2
已经存在,则会覆盖 Key2
原有的值。
假如希望在重命名时,如果 key2
已经存在,则不执行重命名操作。我们可以使用 RENAMENX
命令。
其中 NX
表示 not exist
,即如果不存在则执行重命名操作。
RENAMENX key1 key2
要注意的是,这个命令的返回值是 1 或 0。1 表示重命名成功,0 表示重命名失败。这是与 RENAME
命令有区别的。
实现:
// Handle the RENAMENX command.
// It renames a key in the database only if the new key does not exist.
// RENAME key newkey
func execRenameNX(db *DB, args [][]byte) resp.Reply {
src := string(args[0])
dst := string(args[1])
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeStandardErrorReply("ERR no such key")
}
if _, ok := db.GetEntity(dst); ok {
return reply.MakeIntReply(0)
}
db.PutEntity(dst, entity)
db.Remove(src)
return reply.MakeIntReply(1)
}
func init() {
// ...
RegisterCommand("RENAMENX", execRenameNX, 3)
}
KEYS
在 Redis 中,KEYS
命令用于查找所有符合给定模式的键。它的返回值是一个字符串切片,表示所有符合条件的键。
使用示例:
KEYS *
表示返回所有的键。
或者使用通配符,例如 KEYS user:*
来查找所有以 user:
开头的键。
所以这里我们需要实现一个识别通配符的函数。
Redis 支持哪些通配符?在 Redis 中,KEYS pattern
命令支持一套通配符语法:
*
:匹配任意长度的任意字符(包括空字符)?
:匹配任意单个字符[abc]
:匹配集合中的任意一个字符[a-z]
:匹配范围内的任意一个字符[^abc]
:不匹配集合中的任意一个字符- 可以通过
\
进行字符转义
举例:
user:*
匹配所有以user:
开头的 keysession:??
匹配如session:ab
,session:xy
file[0-9].log
匹配如file1.log
,file9.log
file[^0-9].log
表示不匹配任何以数字结尾的文件名,例如fileA.log
,fileB.log
为了匹配字符串,我们需要实现一个通配符匹配算法。我们可以使用动态规划来实现这个算法。
首先我们以 user:ab
为例,判断字符串是否符合给定的通配符模式。
第一步:CompilePattern("user:??")
即将通配符编译成一个模式对象。这个模式对象包含了所有的通配符信息。
这个通配符会被编译为下表:
索引 | 类型 | 内容 |
---|---|---|
0 | normal | u |
1 | normal | s |
2 | normal | e |
3 | normal | r |
4 | normal | : |
5 | any | ? |
6 | any | ? |
第二步:IsMatch("user:ab")
即判断字符串是否符合给定的通配符模式。我们使用动态规划来实现这个算法。
定义一个布尔二维表:table[i][j]
表示当前的遍历到的数据 s[:i]
和 pattern[:j]
是否匹配。
一格格开始填表:
i(字符串索引) | j(模式索引) | 当前字符 | 模式字符 | 是否匹配 | 原因 |
---|---|---|---|---|---|
0 | 0 | u | u | ✅ | 字符相同 |
1 | 1 | s | s | ✅ | 字符相同 |
2 | 2 | e | e | ✅ | 字符相同 |
3 | 3 | r | r | ✅ | 字符相同 |
4 | 4 | : | : | ✅ | 字符相同 |
5 | 5 | a | ? | ✅ | 匹配任意单个字符 |
6 | 6 | b | ? | ✅ | 匹配任意单个字符 |
最后值都是 true
,所以返回 true
。
这个不用太纠结,直接使用即可。完整的代码,我们在 lib
根文件夹下创建 wildcard
包,然后在 wildcard
包下创建一个 wildcard.go
文件,用于存放通配符的实现。
package wildcard
const (
normal = iota
all // *
any // ?
setSymbol // []
rangSymbol // [a-b]
negSymbol // [^a]
)
type item struct {
character byte
set map[byte]bool
typeCode int
}
func (i *item) contains(c byte) bool {
if i.typeCode == setSymbol {
_, ok := i.set[c]
return ok
} else if i.typeCode == rangSymbol {
if _, ok := i.set[c]; ok {
return true
}
var (
min uint8 = 255
max uint8 = 0
)
for k := range i.set {
if min > k {
min = k
}
if max < k {
max = k
}
}
return c >= min && c <= max
} else {
_, ok := i.set[c]
return !ok
}
}
// Pattern represents a wildcard pattern
type Pattern struct {
items []*item
}
// CompilePattern convert wildcard string to Pattern
func CompilePattern(src string) *Pattern {
items := make([]*item, 0)
escape := false
inSet := false
var set map[byte]bool
for _, v := range src {
c := byte(v)
if escape {
items = append(items, &item{typeCode: normal, character: c})
escape = false
} else if c == '*' {
items = append(items, &item{typeCode: all})
} else if c == '?' {
items = append(items, &item{typeCode: any})
} else if c == '\\' {
escape = true
} else if c == '[' {
if !inSet {
inSet = true
set = make(map[byte]bool)
} else {
set[c] = true
}
} else if c == ']' {
if inSet {
inSet = false
typeCode := setSymbol
if _, ok := set['-']; ok {
typeCode = rangSymbol
delete(set, '-')
}
if _, ok := set['^']; ok {
typeCode = negSymbol
delete(set, '^')
}
items = append(items, &item{typeCode: typeCode, set: set})
} else {
items = append(items, &item{typeCode: normal, character: c})
}
} else {
if inSet {
set[c] = true
} else {
items = append(items, &item{typeCode: normal, character: c})
}
}
}
return &Pattern{
items: items,
}
}
// IsMatch returns whether the given string matches pattern
func (p *Pattern) IsMatch(s string) bool {
if len(p.items) == 0 {
return len(s) == 0
}
m := len(s)
n := len(p.items)
table := make([][]bool, m+1)
for i := 0; i < m+1; i++ {
table[i] = make([]bool, n+1)
}
table[0][0] = true
for j := 1; j < n+1; j++ {
table[0][j] = table[0][j-1] && p.items[j-1].typeCode == all
}
for i := 1; i < m+1; i++ {
for j := 1; j < n+1; j++ {
if p.items[j-1].typeCode == all {
table[i][j] = table[i-1][j] || table[i][j-1]
} else {
table[i][j] = table[i-1][j-1] &&
(p.items[j-1].typeCode == any ||
(p.items[j-1].typeCode == normal && uint8(s[i-1]) == p.items[j-1].character) ||
(p.items[j-1].typeCode >= setSymbol && p.items[j-1].contains(s[i-1])))
}
}
}
return table[m][n]
}
让我们回到 keys.go
文件中,来实现 KEYS
命令。
// Handle the KEYS command.
// It returns all keys in the database that match the specified pattern.
func execKeys(db *DB, args [][]byte) resp.Reply {
pattern := wildcard.CompilePattern(string(args[0]))
result := make([][]byte, 0) // Store all matching keys
db.data.ForEach(func(key string, val interface{}) bool {
if pattern.IsMatch(key) {
result = append(result, []byte(key))
}
return true
})
return reply.MakeMultiBulkReply(result)
}
func init() {
// ...
RegisterCommand("KEYS", execKeys, 2)
}
重要勘误
在之前我们声明 ForEach
的时候,对于接受的条件函数 Consumer
,我们忘记了添加返回值。我们需要在 datastruct/dict/dict.go
中把 Consumer
的返回值改为 bool
,表示是否继续遍历。
type Consumer func(key string, val interface{}) bool // function type for iterating over key-value pairs
实现 Strings 命令集
刚刚我们实现完了 KEYS
命令集,接下来我们实现 Strings
命令集。Strings
命令用于操作字符串类型的键值对。
我们这里挑选一些比较经典的命令来实现:
SET key value
:设置键值对GET key
:获取键值对SETNX key value
:设置键值对,如果键不存在GETSET key value
:设置键值对,并返回旧值STRLEN key
:获取字符串的长度
在 database
目录下创建一个 strings.go
文件,用于存放 Strings
命令的实现。
GET
在 Redis 中,GET
命令用于获取键值对。它的返回值是一个字符串,表示键的值。
// execGet retrieves the value associated with the specified key from the database.
func execGet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
if entity, ok := db.GetEntity(key); ok {
// TODO: If we have multiple types, we need to check the conversion if it's not []byte
return reply.MakeBulkReply(entity.Data.([]byte))
}
return reply.MakeNullBulkReply()
}
func init() {
// ...
RegisterCommand("GET", execGet, 2)
}
需要注意的是,由于我们只实现了 String
类型,所以我们在这里直接返回 entity.Data.([]byte)
即可。如果我们有多个类型,我们需要在这里判断类型转换是否成功。
如果转换失败,我们需要返回一个错误回复。
SET
在 Redis 中,SET
命令用于设置键值对。它的返回值是 OK。
// execSet stores the specified key-value pair in the database.
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
return reply.MakeOKReply()
}
func init() {
// ...
RegisterCommand("SET", execSet, 3)
}
SETNX
在 Redis 中,SETNX
命令用于设置键值对,如果键不存在。它的返回值是 1 或 0。1 表示设置成功,0 表示设置失败。
// execSetNX stores the specified key-value pair in the database only if the key does not already exist.
// If the key already exists, it does not modify the value and returns 0.
// If the key does not exist, it sets the value and returns 1.
// SETNX key value
func execSetNX(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
result := db.PutIfAbsent(key, entity)
return reply.MakeIntReply(int64(result))
}
func init() {
// ...
RegisterCommand("SETNX", execSetNX, 3)
}
GETSET
在 Redis 中,GETSET
命令用于设置键值对,并返回旧值。它的返回值是一个字符串,表示旧值。
// execGetSet stores the specified key-value pair in the database and returns the old value associated with the key.
func execGetSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity, ok := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{
Data: value,
})
if !ok {
return reply.MakeNullBulkReply()
}
return reply.MakeBulkReply(entity.Data.([]byte))
}
func init() {
// ...
RegisterCommand("GETSET", execGetSet, 3)
}
STRLEN
在 Redis 中,STRLEN
命令用于获取字符串的长度。它的返回值是一个整数,表示字符串的长度。
STRLEN key
// execStrLen retrieves the length of the value associated with the specified key.
func execStrLen(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
entity, ok := db.GetEntity(key)
if !ok {
return reply.MakeNullBulkReply()
}
return reply.MakeIntReply(int64(len(entity.Data.([]byte))))
}
func init() {
// ...
RegisterCommand("STRLEN", execStrLen, 2)
}
到这里我们就实现完了 Strings
命令集。我们可以在 main.go
中测试一下。
实现核心 database
让我们回到 main.go
文件中。
func main() {
// ...
err := tcp.ListenAndServeWithSignal(
&tcp.Config{
Address: fmt.Sprintf("%s:%d",
config.Properties.Bind,
config.Properties.Port),
},
handler.MakeHandler())
if err != nil {
logger.Error(err)
}
}
我们创建了一个 handler
用于处理 TCP 连接:
// MakeHandler creates a RespHandler instance
func MakeHandler() *RespHandler {
var db databaseface.Database
db = database.NewEchoDatabase()
return &RespHandler{
db: db,
}
}
而在这里,我们仍旧使用的是之前创建的回显服务器,并没有真正使用到数据库业务。
现在我们需要去实现核心的 database
业务逻辑。
在 database
目录下创建一个 database.go
文件,用于存放数据库的实现。
每个数据库都由多个 DB
实例组成。我们在这里创建一个 Database
结构体,用于存放所有的 DB
实例。
package database
type Database struct {
dbSet []*DB
}
接下来我们需要实现 database
的三个接口:
type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
AfterClientClose(c resp.Connection)
Close()
}
它们分别是:
Exec
:执行命令AfterClientClose
:客户端关闭后执行Close
:关闭数据库
我们向 redis.conf
中添加一个 databases
配置项,用于指定数据库的数量。默认值为 16。
databases 16
然后创建 MakeDatabase
函数,用于创建数据库实例。这里我们需要根据配置文件中的 databases
配置项来创建数据库实例。
// NewDatabase creates a new Database instance
func NewDatabase() *Database {
database := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
database.dbSet = make([]*DB, config.Properties.Databases)
for i := range database.dbSet {
db := MakeDB()
db.index = i
database.dbSet[i] = db
}
return database
}
由于一个数据库实例是多个 DB
实例的集合,所以我们需要给用户实现一个选择数据库的功能。在之前实现的我们的 RESP 协议连接中:
type Connection interface {
Write([]byte) error // Write data to the connection
GetDBIndex() int // Get database index
SelectDB(int) // Select database
}
我们创建了一个 SelectDB
方法,用于选择数据库。然后我们在 RESP 中实现了它:
// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) {
c.selectedDB = dbNum
}
接下来我们就可以使用这个方法来实现数据库的选择功能。我们需要支持用户输入 SELECT <dbNum>
命令来选择数据库。
在 database.go
文件实现:
// execSelect sets the current database for the client connection.
// select x
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
dbIndex, err := strconv.Atoi(string(args[0]))
if err != nil {
return reply.MakeStandardErrorReply("ERR invalid DB index")
}
if dbIndex < 0 || dbIndex >= len(database.dbSet) {
return reply.MakeStandardErrorReply("ERR DB index out of range")
}
c.SelectDB(dbIndex)
return reply.MakeIntReply(int64(dbIndex))
}
这里有几个注意的点,一个是我们需要把 dbNum
转换为整数,另一个是我们需要判断 dbNum
是否在范围内。
接下来我们要实现 Exec
方法,这个方法是我们整个服务器的入口,所有的命令都会经过这个方法,然后执行对应的命令。
// Exec executes a command on the database
func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {
defer func() {
if err := recover(); err != nil {
logger.Error("Database Exec panic:" + err.(error).Error())
}
}()
cmdName := strings.ToLower(string(args[0]))
if cmdName == "select" {
if len(args) != 2 {
return reply.MakeArgNumErrReply("select")
}
return execSelect(client, d, args[1:])
}
// Get the current database index from the client connection
db := d.dbSet[client.GetDBIndex()]
return db.Exec(client, args)
}
在这里我们需要注意的是,我们需要在 Exec
方法中判断命令是否是 SELECT
命令,如果是的话,我们就执行 execSelect
方法来选择数据库。
然后取出当前数据库的索引,执行对应的命令。
接下来我们要到 resp/handler/handler.go
文件中修改 MakeHandler
方法,来使用我们刚刚实现的 database
。使用 database.NewDatabase()
来创建一个新的数据库实例。代替原先的回显服务器 database.NewEchoDatabase()
。
func MakeHandler() *RespHandler {
db := database.NewDatabase()
return &RespHandler{
db: db,
}
}
到目前为止,我们来回顾一下 Redigo 执行的流程。
-
客户端请求解析:
- 客户端通过 TCP 连接发送命令,服务器使用 RESP 协议解析器解析请求。
ParseStream
函数异步解析客户端的命令流,将其转换为Payload
对象。
-
命令分发与验证:
- 解析后的命令通过
RespHandler
分发到数据库层。 - 数据库层的
Exec
方法根据命令名称从cmdTable
中查找对应的命令结构体。 - 验证命令参数数量是否符合要求,若不符合则返回错误回复。
- 解析后的命令通过
-
命令执行:
- 数据库层调用对应的命令执行函数(如
execGet
、execSet
等)。 - 执行函数通过
DB
结构体操作底层的SyncDict
数据结构,完成数据的增删改查。
- 数据库层调用对应的命令执行函数(如
-
响应生成:
- 命令执行完成后,返回一个
resp.Reply
对象。 - 服务器将
Reply
对象序列化为 RESP 格式并发送回客户端。
- 命令执行完成后,返回一个
-
多数据库支持:
- 客户端可以通过
SELECT
命令切换数据库。 - 每个客户端连接维护一个
selectedDB
字段,指向当前使用的数据库实例。
- 客户端可以通过
通过以上流程,Redigo 实现了一个简化版的 Redis 服务器,支持基本的键值操作和多数据库管理。
从下一章节开始,我们要实现持久化功能。
测试
启动服务器:
go run main.go
新开一个命令行,执行命令:
printf "*1\r\n\$4\r\nPING\r\n" | nc localhost 6379
返回:
+PONG
执行:
printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379
这是一条 SET
命令,设置键值对 key
和 value
。
返回:
+OK
然后我们试着获取一下:
printf "*2\r\n\$3\r\nGET\r\n\$3\r\nkey\r\n" | nc localhost 6379
返回:
$5
value
我们可以看到,返回的值是 value
,说明我们成功地设置了键值对。
到目前为止,数据存在了内存中的 dict
中。我们可以使用 KEYS
命令来查看所有的键:
printf "*2\r\n\$4\r\nKEYS\r\n\$1\r\n*\r\n" | nc localhost 6379
返回:
*1
$3
key
提交到 GitHub
将代码提交到 GitHub:
git add .
# 示例:可以自己写一下
git commit -m "feat: implement in-memory database
- Create Dict interface and SyncDict implementation
- Add database structure with multi-DB support
- Implement command pattern for Redis commands
- Add basic Redis commands (PING, SET, GET, DEL, etc.)
- Implement pattern matching for KEYS command
- Support string operations like GETSET and STRLEN"
git push origin main