Skip to Content
持久化

实现持久化功能

本文进度对应的代码仓库:Redis 持久化

持久化是 Redis 中的一个重要特性,我们之前实现了内存数据库,但是由于数据存储在内存中,重启后数据会丢失。为了实现持久化,我们需要将数据存储到磁盘上。

在本章节中,我们将实现 Redis 的 AOF (Append Only File)持久化功能。AOF 是 Redis 的一种持久化机制,它将所有写操作记录到一个文件中,以便在 Redis 重启时可以重新加载数据。

我们先到 redis.conf 文件中添加两个字段:

appendonly yes appendfilename appendonly.aof

redis.conf 文件中添加 appendonly yes 表示启用 AOF 持久化功能,appendfilename "appendonly.aof" 表示 AOF 文件的名称。

实现 NewAofHandler

我们在根目录下创建 aof 文件夹。然后在 aof 文件夹中创建 aof.go 文件,定义 AOF 处理器的结构体和方法。

AofHandler结构体的作用是接收命令并写入AOF文件,将Redis服务器执行的写命令以追加方式持久化到文件中。在服务器重启时从AOF文件恢复数据,通过重新执行AOF文件中记录的命令来重建数据库状态。

在 Redis 中,我们有数据要写入 AOF 文件,我们是等待落盘完成后再返回给客户端,还是说直接返回给客户端,然后在后台异步写入 AOF 文件呢?我们选择第二种方式,这样可以提高性能。

所以在 aof.go 文件中,我们需要定义一个 AofHandler 结构体,它包含一个数据库实例和一个用于写入 AOF 文件的通道。我们还需要定义一个 payload 结构体,用于存储需要写入 AOF 文件的命令和对应的数据库索引。

AOF 处理器的结构体如下:

aof/aof.go
type CmdLine = [][]byte type payload struct { cmdLine CmdLine dbIndex int } // AofHandler handles the Append-Only File (AOF) functionality for Redis. type AofHandler struct { db database.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int }

这个结构体包含了以下字段:

  • db:数据库实例,用于执行命令。
  • aofChan:用于接收需要写入 AOF 文件的命令的通道。
  • aofFile:AOF 文件的文件句柄。
  • aofFilename:AOF 文件的文件名。
  • currentDB:当前数据库的索引。

接下来,我们需要实现 AofHandler 的构造函数和初始化方法。

初始化方法主要要做的就是填充好 AofHandler 结构体的字段。

我们在 aof.go 文件中添加以下代码:

aof/aof.go
// NewAofHandler creates a new AofHandler instance. func NewAofHandler(db database.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db // TODO: Recover from AOF file // Open the AOF file for reading and writing aofFile, err := os.OpenFile(handler.aofFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) if err != nil { return nil, err } handler.aofFile = aofFile // TODO: Make a chan for aof return handler, nil }

在这里留下了两个 TODO:

  • 从 AOF 文件中恢复数据,也就是把之前存储的 AOF 文件中的数据加载到数据库中
  • 创建一个用于写入 AOF 文件的通道

这个留到一会我们再补上。

接下来我们实现一个 AddAof 方法,它将命令行添加到 AOF 处理器中。这个方法会将命令行推送到 aofChan 通道中,以便异步写入 AOF 文件。

要注意的是,AddAof 方法会在 AOF 文件的写入过程中确保通道的存在,并且确保开启了 AOF 功能。

aof/aof.go
// AddAof adds a command line to the AOF file. It will push the command line to the aofChan channel. func (h *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if h.aofChan == nil || !config.Properties.AppendOnly { h.aofChan = make(chan *payload, 100) } h.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } }

我们需要一个函数用来处理 AOF 的落盘操作,这个函数会从 aofChan 通道中读取命令,并将其写入 AOF 文件。需要源源不断的从 aofChan 通道中读取命令,并将其写入 AOF 文件。

我们在 aof.go 文件中添加以下函数签名,这个留到一会再补全:

aof/aof.go
// handleAof handles the AOF file writing. It will write the command line to the AOF file. func (h *AofHandler) handleAof() { // TODO: Write the command line to the AOF file }

这里了解了为什么要用 Chan 以及 Chan 的作用后,我们反过来补全一下 NewAofHandler 方法中的 TODO。

我们需要在 NewAofHandler 方法中创建一个用于写入 AOF 文件的通道。

我们定义一个 aofBufferSize 常量,用于设置 AOF 通道的缓冲区大小。我们将其设置为 1 << 16,也就是 65536 字节。

然后我们在 NewAofHandler 方法中创建一个大小为 aofBufferSize 的通道,并将其赋值给 aofChan 字段。

还得创建一个协程用来运行 handleAof 方法,保证 handleAof 方法能够在后台运行。

aof/aof.go
const aofBufferSize = 1 << 16 // NewAofHandler creates a new AofHandler instance. func NewAofHandler(db database.Database) (*AofHandler, error) { // ... handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofBufferSize) // Start a goroutine to handle the AOF file writing go func() { handler.handleAof() }() return handler, nil }

然后我们需要实现一个 LoadAof 方法,用于从 AOF 文件中加载数据。这个方法会读取 AOF 文件中的命令,并将其执行到数据库中。

添加下面的函数签名:

aof/aof.go
// LoadAof loads commands from the AOF file and executes them on the database. func (h *AofHandler) LoadAof() { // TODO: Implement loading commands from AOF file }

这样我们就可以补全 handleAof 方法了。

aof/aof.go
// NewAofHandler creates a new AofHandler instance. func NewAofHandler(db database.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db // Load the AOF file if it exists handler.LoadAof() // Open the AOF file for reading and writing aofFile, err := os.OpenFile(handler.aofFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) if err != nil { return nil, err } handler.aofFile = aofFile // Make a chan for aof handler.aofChan = make(chan *payload, aofBufferSize) // Start a goroutine to handle the AOF file writing go func() { handler.handleAof() }() return handler, nil }

实现 handleAof

handleAof 方法的作用是从 aofChan 通道中读取命令,并将其写入 AOF 文件。我们需要不断地从通道中读取命令,并将其写入 AOF 文件。

还记得我们声明了一个 currentDB 字段吗?这个字段的作用是记录当前数据库的索引。

如果发生了数据库切换,我们需要将当前数据库的索引更新为新的索引。然后使用 select 命令来切换数据库,并将新的索引赋值给 currentDB。这样可以确保在处理 AOF 文件时,命令会被正确地执行到当前数据库中。

在这里我们先实现这样的逻辑:

不断从 aofChan 通道中读取命令,判断当前数据库的索引是否发生了变化,如果发生了变化,就执行 SELECT 命令来切换数据库。

我们需要构造如下的格式:

*2 $6 SELECT $1 $0
aof/aof.go
func (h *AofHandler) handleAof() { h.currentDB = 0 for p := range h.aofChan { // Write the SELECT command if the database index has changed if p.dbIndex != h.currentDB { h.currentDB = p.dbIndex // TODO: Write the SELECT command to the AOF file } } }

我们可以使用之前的 reply 包中的 MakeMultiBulkReply 方法来创建一个 MULTI BULK 回复,然后将其写入 AOF 文件中。

resp/reply/reply.go
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{Args: args} }

这里接收的参数是一个 [][]byte 的切片,所以我们需要一个方法,将传入的多个 string 转换为 [][]byte 的切片。

lib 下创建 utils 文件夹,然后在 utils 文件夹中创建 utils.go 文件,添加以下代码:

lib/utils/utils.go
// ToCmdLine convert strings to [][]byte func ToCmdLine(cmd ...string) [][]byte { args := make([][]byte, len(cmd)) for i, s := range cmd { args[i] = []byte(s) } return args }

这个功能函数的作用是将传入的多个 string 转换为 [][]byte 的切片。

然后我们在 handleAof 方法中调用 MakeMultiBulkReply 方法,构造一个 MULTI BULK 回复,然后将其写入 AOF 文件中。

aof/aof.go
// handleAof handles the AOF file writing. It will write the command line to the AOF file. func (h *AofHandler) handleAof() { h.currentDB = 0 for p := range h.aofChan { // Write the SELECT command if the database index has changed if p.dbIndex != h.currentDB { h.currentDB = p.dbIndex // Write the SELECT command to the AOF file data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := h.aofFile.Write(data) if err != nil { logger.Error("AOF write error: " + err.Error()) // Continue to the next command continue } } // Write the command line to the AOF file data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := h.aofFile.Write(data) if err != nil { logger.Error("AOF write error: " + err.Error()) // Continue to the next command continue } } }

这里主要是分了两步:

  1. 如果数据库索引发生了变化,就执行 SELECT 命令来切换数据库。
  2. 将命令行写入 AOF 文件中。

注意这里我们的 MakeMultiBulkReply 方法返回的是一个 *MultiBulkReply 的指针,所以我们需要调用 ToBytes 方法将其转换为字节数组才能写入 AOF 文件中。

实现记录功能

我们需要追踪数据库执行的过程中发生的写命令,并将其记录到 AOF 文件中。我们可以在 database 包的 Database 结构体中添加一个 aofHandler 字段,用于存储 AOF 处理器的实例。

database/database.go
type Database struct { dbSet []*DB // 添加下面的字段 aofHandler *aof.AofHandler }

然后在 MakeDatabase 方法中,我们需要创建一个 AOF 处理器的实例,并将其赋值给 aofHandler 字段。这时候需要判断 AOF 功能是否开启,如果开启了,就创建一个 AOF 处理器的实例。

database/database.go
// NewDatabase creates a new Database instance func NewDatabase() *Database { database := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } database.dbSet = make([]*DB, config.Properties.Databases) for i := range database.dbSet { db := MakeDB() db.index = i database.dbSet[i] = db } // 添加下面的代码 if config.Properties.AppendOnly { aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } database.aofHandler = aofHandler } return database }

我们在哪一层需要使用 AOF 处理器呢?

回忆我们之前实现的 database 包中的各类方法,拿 strings 包中的方法举例:

execSet 方法中,我们需要将 SET 命令记录到 AOF 文件中。我们可以在 execSet 方法中调用 AddAof 方法,将命令行添加到 AOF 处理器中。

database/strings.go
// execSet stores the specified key-value pair in the database. func execSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity := &database.DataEntity{ Data: value, } db.PutEntity(key, entity) // Add the command line to the AOF file // ... return reply.MakeOKReply() }

但是 db 并不持有 AofHandler 的引用,所以我们可以在 db 结构体中放入一个 func 类型的字段,用于执行 AOF 处理器的 AddAof 方法。

database/db.go
type DB struct { index int data dict.Dict // 添加下面的字段 addAof func(CmdLine) }

然后在 database 包的 NewDatabase 方法中,我们需要对每个 db 进行初始化,对每个 db 创建一个匿名函数,用于执行 AOF 处理器的 AddAof 方法。

这个匿名函数会调用 aof 处理器的 AddAof 方法,并将当前数据库的索引和命令行作为参数传入。

database/database.go
// NewDatabase creates a new Database instance func NewDatabase() *Database { // ... if config.Properties.AppendOnly { aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } database.aofHandler = aofHandler // 添加下面的代码 for _, db := range database.dbSet { db.addAof = func(line CmdLine) { database.aofHandler.AddAof(db.index, line) } } } return database }

这样的话我们在 execSet 方法中就可以直接调用 db.addAof 方法,将命令行添加到 AOF 处理器中。

database/strings.go
// execSet stores the specified key-value pair in the database. func execSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity := &database.DataEntity{ Data: value, } db.PutEntity(key, entity) // 添加下面这一行 db.addAof(args) return reply.MakeOKReply() }

但是你注意到了吗,这里存在一个问题,在 aog 初始化的时候,我们会调用 LoadAof 方法来加载 AOF 文件中的数据,加载 AOF 的数据的时候可能会调用 execSet 方法,而 execSet 方法又会调用 db.addAof 方法。

aof/aof.go
// NewAofHandler creates a new AofHandler instance. func NewAofHandler(db database.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db // Load the AOF file if it exists handler.LoadAof() // ... }

但是呢,在下面所示的 NewDatabase 方法中,我们先执行 NewAofHandler 方法,这里面会调用 LoadAof 方法,会调用 execSet 方法中的 db.addAof 方法,但是此时 db.addAof 方法还没有被初始化,所以会导致空指针异常。

database/database.go
// NewDatabase creates a new Database instance func NewDatabase() *Database { // ... if config.Properties.AppendOnly { // 先执行 NewAofHandler 方法,这里面会调用 LoadAof 方法,会调用 execSet 方法中的 db.addAof 方法 aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } database.aofHandler = aofHandler // 再执行初始化 db 中的 addAof 方法 for _, db := range database.dbSet { db.addAof = func(line CmdLine) { database.aofHandler.AddAof(db.index, line) } } } return database }

所以我们可以在 db 中初始化 db 的时候,将 addAof 方法初始化为一个空函数,这样就不会导致空指针异常了。

database/db.go
// MakeDB creates a new DB instance func MakeDB() *DB { return &DB{ index: 0, data: dict.MakeSyncDict(), // 添加下面的代码 addAof: func(line CmdLine) { // No-op by default, // can be overridden by the database instance }, } }

这样在恢复 AOF 文件中的数据时,就不会出错了。而且,如果没有开启 AOF 功能的话,addAof 方法也不会被调用。后续我们也不需要在每个地方都判断 AOF 功能是否开启了。

接下来我们逐个回到 database 包中的方法中,添加 db.addAof 方法,来记录应当写入 AOF 文件的命令。

注意,这里我们只需要记录写命令,读命令不需要记录到 AOF 文件中。

例如 PING 命令就不需要记录到 AOF 文件中。

我们来看 database/keys.go 文件中的 execDel 方法:

database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { keys := make([]string, len(args)) for i, arg := range args { keys[i] = string(arg) } deleted := db.Removes(keys...) return reply.MakeIntReply(int64(deleted)) }

我们需要在 execDel 方法中添加 db.addAof 方法,将 DEL 命令记录到 AOF 文件中。在这里 args 是不含有 DEL 命令的,所以我们需要传入命令名称,将两者组装成一个 CmdLine 的切片。

我们到 utils 文件夹中添加一个 ToCmdLineWithName 方法,用于将命令名称和参数转换为 CmdLine 的切片。

lib/utils/utils.go
// ToCmdLineWithName convert command name and args to [][]byte func ToCmdLineWithName(name string, args ...[]byte) [][]byte { cmd := make([][]byte, len(args)+1) cmd[0] = []byte(name) for i, s := range args { cmd[i+1] = s } return cmd }

然后在 execDel 方法中调用 ToCmdLineWithName 方法,将 DEL 命令和参数转换为 CmdLine 的切片,然后传入 db.addAof 方法中。

database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { // ... deleted := db.Removes(keys...) // 添加下面的代码 if deleted > 0 { db.addAof(utils.ToCmdLineWithName("DEL", args...)) } return reply.MakeIntReply(int64(deleted)) }

然后是 FLUSHDB 命令:

database/keys.go
func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() // 添加下面的代码 db.addAof(utils.ToCmdLineWithName("FLUSHDB", args...)) return reply.MakeOKReply() }

然后是 RENAME 命令:

database/keys.go
func execRename(db *DB, args [][]byte) resp.Reply { // ... db.Remove(src) // 添加下面的代码 db.addAof(utils.ToCmdLineWithName("RENAME", args...)) return reply.MakeOKReply() }

然后是 RENAMENX 命令:

database/keys.go
func execRenameNX(db *DB, args [][]byte) resp.Reply { // ... db.Remove(src) // 添加下面的代码 db.addAof(utils.ToCmdLineWithName("RENAMENX", args...)) return reply.MakeIntReply(1) }

接下来看 database/strings.go 文件中的 execSet 方法:

database/strings.go
func execSet(db *DB, args [][]byte) resp.Reply { // ... db.PutEntity(key, entity) db.addAof(utils.ToCmdLineWithName("SET", args...)) return reply.MakeOKReply() }

然后是 SETNX 命令:

database/strings.go
func execSetNX(db *DB, args [][]byte) resp.Reply { // ... result := db.PutIfAbsent(key, entity) // 添加下面的代码 db.addAof(utils.ToCmdLineWithName("SETNX", args...)) return reply.MakeIntReply(int64(result)) }

然后是 GETSET 命令:

database/strings.go
func execGetSet(db *DB, args [][]byte) resp.Reply { // ... db.PutEntity(key, &database.DataEntity{ Data: value, }) // 添加下面的代码 db.addAof(utils.ToCmdLineWithName("GETSET", args...)) if !ok { return reply.MakeNullBulkReply() } return reply.MakeBulkReply(entity.Data.([]byte)) }

测试和解决闭包问题

现在我们测试一下 AOF 功能是否正常工作。

启动服务器:

go run main.go

新开一个命令行,执行命令:

printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379

这是一条 SET 命令,设置键值对 keyvalue

返回:

+OK

然后我们查看 AOF 文件 (appendonly.aof) 的内容:

*2 $6 SELECT $2 15 *3 $3 SET $3 key $5 value

好像有点问题,这里 SELECT 的参数是 15。

按道理我们没有选择数据库 15 的命令,这里应该是 0 才对。

问题出在 database 包中的 NewDatabase 方法中:

database/database.go
func NewDatabase() *Database { // ... if config.Properties.AppendOnly { aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } database.aofHandler = aofHandler for _, db := range database.dbSet { db.addAof = func(line CmdLine) { database.aofHandler.AddAof(db.index, line) } } } return database }

在这段代码中,我们为每个 db 实例设置了一个匿名函数,并将其赋值给 db.addAof。这个函数会调用 AOF 处理器的 AddAof 方法,并传入当前数据库的索引(db.index)和命令行参数。

值得注意的是,这里形成了一个 闭包:匿名函数中引用了外部的变量 db,而这个变量正是 for 循环中的循环变量。Go 的闭包机制会捕获变量本身(即引用),而不是在函数定义时的值。换句话说,闭包中保存的是对变量 db 的引用,而不是每次循环中 db.index 的快照。

由于 for 循环中的 db 变量在每轮循环中是复用的,所有闭包最终都引用的是同一个 db。因此,当循环结束后,db 的值可能已经变成了最后一个数据库(例如索引为 15 的那个),从而导致所有闭包中的 db.index 实际都是 15,而不是各自对应的 0、1、2 等值。

此外,因为闭包中引用了循环外部的变量 db,Go 编译器会通过逃逸分析判断该变量的生命周期超出了当前栈帧的作用范围,因此会将其分配到堆上(即发生逃逸)。这不仅可能引发逻辑错误,也会带来一定的性能开销。

💡

例如下面的一个例子:

func main() { funcs := []func(){} for i := 0; i < 3; i++ { funcs = append(funcs, func() { fmt.Println(i) }) } for _, f := range funcs { f() } }

在这里我们可能期望输出 0、1、2,但是实际上输出的是 3、3、3。 因为闭包捕获的 i 在最后一次循环结束后,i 的值是 3。那么在后面执行闭包的时候,i 的值就是 3。

为了解决这个问题,我们可以创建一个变量来截断闭包的引用。我们可以在 for 循环中创建一个新的变量 sdb,并将其赋值为当前的 db,然后在闭包中使用 sdb 变量。

这样即使第一轮循环结束后,sdb 逃逸到堆上,在第二轮也会新建一个 sdb 变量(地址不同),避免闭包问题。

database/database.go
func NewDatabase() *Database { // ... if config.Properties.AppendOnly { aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } database.aofHandler = aofHandler for _, db := range database.dbSet { // 修改下面的代码 sdb := db sdb.addAof = func(line CmdLine) { database.aofHandler.AddAof(sdb.index, line) } } } return database }

接下来删除 appendonly.aof 文件,重新运行服务器,执行 SET 命令,然后查看 AOF 文件的内容:

printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379
*3 $3 SET $3 key $5 value

现在 AOF 文件的内容是正确的了。因为这里默认是选择数据库 0 的,所以不会出现 SELECT 命令。

那么我们测试一下 SELECT 命令:

printf "*2\r\n\$6\r\nSELECT\r\n\$1\r\n1\r\n" | nc localhost 6379

返回:

:1

不对!应该返回 OK 才对,原来是我们在 execSelect 中写错了:

database/database.go
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply { dbIndex, err := strconv.Atoi(string(args[0])) if err != nil { return reply.MakeStandardErrorReply("ERR invalid DB index") } if dbIndex < 0 || dbIndex >= len(database.dbSet) { return reply.MakeStandardErrorReply("ERR DB index out of range") } c.SelectDB(dbIndex) return reply.MakeIntReply(int64(dbIndex)) }

这里我们应该返回 OK,而不是 :1

database/database.go
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply { dbIndex, err := strconv.Atoi(string(args[0])) if err != nil { return reply.MakeStandardErrorReply("ERR invalid DB index") } if dbIndex < 0 || dbIndex >= len(database.dbSet) { return reply.MakeStandardErrorReply("ERR DB index out of range") } c.SelectDB(dbIndex) return reply.MakeOKReply() // 修改这里 }

再进行测试:

printf "*2\r\n\$6\r\nSELECT\r\n\$1\r\n1\r\n" | nc localhost 6379

返回:

+OK

但是!让人恼火的是,aof 文件中还是没有 SELECT 命令。

这是为什么呢?

注意,我们去看看我们的输出日志:

(base) orangejuice@Chengs-Mac redigo % go run main.go [INFO][server.go:40] 2025/04/15 20:41:19 bind: 0.0.0.0:6379, start listening... [INFO][server.go:69] 2025/04/15 20:41:21 accept link [INFO][handler.go:61] 2025/04/15 20:41:21 connection closed: [::1]:62176 [INFO][server.go:69] 2025/04/15 20:41:22 accept link [INFO][handler.go:61] 2025/04/15 20:41:22 connection closed: [::1]:62185

我们发现每处理完一次命令,都会打印一条 connection closed 的日志,这说明连接已经关闭了。

探求原因在于,我们使用 printf 命令进行测试,但是这不会进行持久连接,所以每次测试完都会关闭连接。然后会选择新的数据库,这样就会导致后面的 SET 命令还是使用的默认的数据库 0。

我们可以换一个测试工具,例如 redis-cli,假设你之前安装过了 redis 那么这个命令应该是可以直接使用的。

我们清空 appendonly.aof 文件,然后重启服务器:

go run main.go

然后使用 redis-cli 连接到服务器:

redis-cli -p 6379

然后执行 SELECT 命令,由于它就是一个 redis 客户端,所以我们可以直接执行 SELECT 命令,而不需要使用 RESP 格式输入。

SELECT 1

返回:

OK

然后执行 SET 命令:

SET mykey myvalue

返回:

OK

真实控制台输出:

127.0.0.1:6379[1]> SELECT 1 OK 127.0.0.1:6379[1]> SET mykey myvalue OK

然后我们查看 appendonly.aof 文件的内容:

*2 $6 SELECT $1 1 *3 $3 SET $5 mykey $7 myvalue

现在 AOF 文件的内容是正确的了。

实现 LoadAof

接下来我们实现 LoadAof 方法,用于从 AOF 文件中加载数据。这个方法会读取 AOF 文件中的命令,并将其执行到数据库中。

我们写入 aof 文件时就使用了 RESP 协议的格式,所以我们可以很好的使用 RESP 协议的解析器来解析 AOF 文件中的命令。

我们创建补充 LoadAof 方法中。

还记得我们之前章节中,实现了一个 parser 包吗?这个包可以用于解析 RESP 协议的命令。我们可以使用这个包来解析 AOF 文件中的命令。

我们需要做一些错误处理,假如无法打开 AOF 文件,或者解析 AOF 文件中的命令失败,我们需要记录错误日志,并继续执行下一个命令。

然后使用 parser 包中的 ParseStream 方法来解析 AOF 文件中的命令。这个方法会返回一个通道,我们可以从这个通道中读取解析后的命令。

我们需要判断解析后的命令是否为 nil,如果是 nil,说明解析失败,我们需要记录错误日志,并继续执行下一个命令。

然后我们需要判断解析后的命令是否为 MultiBulkReply 类型,如果是,我们就可以用来执行了。

aof/aof.go
// LoadAof loads commands from the AOF file and executes them on the database. func (h *AofHandler) LoadAof() { // Open the AOF file for reading aofFile, err := os.Open(h.aofFilename) if err != nil { logger.Error("AOF file open error: " + err.Error()) return } defer aofFile.Close() ch := parser.ParseStream(aofFile) for p := range ch { if p.Err != nil { // If the error is EOF or unexpected EOF, break the loop if p.Err == io.EOF || p.Err == io.ErrUnexpectedEOF { // End of file break } // Other errors logger.Error("AOF file parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("AOF file empty payload") continue } // Attempt to parse the payload as a MultiBulkReply // If it fails, log an error and continue to the next payload r, ok := p.Data.(*reply.MultiBulkReply) // TODO } }

然后怎么去执行这个命令呢?

我们想到上一章节中我们实现的 database 包中的 Exec 方法,这个方法可以执行 RESP 协议的命令。

我们回顾一下 Exec 方法的实现:

database/database.go
// Exec executes a command on the database func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply { defer func() { if err := recover(); err != nil { logger.Error("Database Exec panic:" + err.(error).Error()) } }() cmdName := strings.ToLower(string(args[0])) if cmdName == "select" { if len(args) != 2 { return reply.MakeArgNumErrReply("select") } return execSelect(client, d, args[1:]) } // Get the current database index from the client connection db := d.dbSet[client.GetDBIndex()] return db.Exec(client, args) }

但是调用这个方法我们需要一个 client 对象,而我们在 LoadAof 方法中并没有 client 对象。

Exec 中,使用 client 对象的唯一目的是为了获取当前数据库的索引,而不关心 client 对象的其他属性。

所以我们可以构造一个假的 client 对象,只需要在里面填入 selectedDB 字段即可。接下来,我们将创建一个 fakeClient 结构体,并实现 GetDBIndex 方法,以便在 LoadAof 方法中使用它。

database/database.go
// LoadAof loads commands from the AOF file and executes them on the database. func (h *AofHandler) LoadAof() { // Open the AOF file for reading aofFile, err := os.Open(h.aofFilename) if err != nil { logger.Error("AOF file open error: " + err.Error()) return } defer aofFile.Close() ch := parser.ParseStream(aofFile) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { // If the error is EOF or unexpected EOF, break the loop if p.Err == io.EOF || p.Err == io.ErrUnexpectedEOF { // End of file break } // Other errors logger.Error("AOF file parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("AOF file empty payload") continue } // Attempt to parse the payload as a MultiBulkReply // If it fails, log an error and continue to the next payload r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("AOF file require multi bulk reply") continue } // Execute the command on the database rep := h.db.Exec(fakeConn, r.Args) if reply.IsErrReply(rep) { logger.Error("Execute AOF command error") } } }

在这里我们创建了一个 fakeConn 结构体,并将其传入 Exec 方法中。这样就可以执行 AOF 文件中的命令了。

然后调用 Exec 方法时,我们将 fakeConn 传入,这样就可以获取当前数据库的索引了。

测试

我们先删除之前的 appendonly.aof 文件,然后重启服务器:

go run main.go

然后为了便于测试,我们先写入一些命令到 AOF 文件中:

使用 redis-cli 连接到服务器:

redis-cli -p 6379

然后执行以下命令:

SELECT 1 SET key1 value1 SET key2 value2 GETSET key1 value2 SET key3 value3 SET key4 value4

现在 AOF 文件中应该有以下内容:

*2 $6 SELECT $1 1 *3 $3 SET $4 key1 $6 value1 *3 $3 SET $4 key2 $6 value2 *3 $6 GETSET $4 key1 $6 value2 *3 $3 SET $4 key3 $6 value3 *3 $3 SET $4 key4 $6 value4

然后我们重新启动服务器:

go run main.go

然后使用 redis-cli 连接到服务器:

redis-cli -p 6379

刚才我们把所有的数据都写到 1 号数据库中,所以我们先选择 1 号数据库:

SELECT 1

返回:

OK

然后我们执行 KEYS * 命令,查看 1 号数据库中的所有键:

KEYS *

返回:

1) "key1" 2) "key2" 3) "key3" 4) "key4"

可以看到我们之前写入的数据都已经恢复了。

127.0.0.1:6379[1]> SELECT 1 OK 127.0.0.1:6379[1]> KEYS * 1) "key4" 2) "key1" 3) "key2" 4) "key3"

这里没有按照顺序返回,因为 dict 是无序的。

为了测试服务器切换也是正常的,我们再往 0 号数据库中写入一些数据:

SELECT 0 SET key5 value5 SET key8 value8

然后重启服务器,再连接后执行 KEYS * 命令:

SELECT 0 KEYS *

返回:

127.0.0.1:6379> SET key5 value5 OK 127.0.0.1:6379> KEYS * 1) "key8" 2) "key5"

可以看到我们之前写入的数据都已经恢复了。

重要提示。

我们重启服务器后,最好清空 AOF 数据,因为这里可能存在一个问题。就是上次连接的时候,假设是在数据库 3 上断开的连接,那么下次重启服务器的时候,AOF 文件中会有 SELECT 3 的命令,但是刚启动的时候,数据库的索引是 0,此时如果往数据库 0 中写入数据,我们现在的机制下,我们是没有检测上一次的数据库索引的,所以会导致 AOF 数据被写入到 3 号数据库中。

例如某一次我们的 AOF 文件如下:

*2 $6 SELECT $1 3 *3 $3 SET $4 key1 $6 value1

此时,我们选择的数据库是 3,然后我们关闭了服务器。

然后我们启动服务器,这个时候默认选择的数据库是 0,此时我们往数据库 0 中写入数据:

SELECT 0 SET key2 value2

此时 AOF 文件中会有以下内容:

*2 $6 SELECT $1 3 *3 $3 SET $4 key1 $6 value1 # 下面是我们刚才写入的 *3 $3 SET $4 key2 $6 value2

看到了吗,这里没有一个 SELECT 0 的命令,所以我们在写入数据的时候,实际上是写入到了 3 号数据库中,而不是 0 号数据库中。

所以我们在写入数据的时候,最好清空 AOF 文件。

还有些没实现的

我们现在的 AOF 的主要功能已经实现了,但是我们还没有实现 AOF 的重写功能。也就是当 AOF 文件过大的时候,我们需要将其重写为一个新的 AOF 文件。许多重复执行写操作的键,我们可以将其合并为一个命令。

这可以留作后续的功能扩展。

提交

我们现在的 AOF 功能已经实现了,下面我们提交代码:

git add . git commit -m "feat: add aof persistence"
Last updated on