实现持久化功能
本文进度对应的代码仓库:Redis 持久化
持久化是 Redis 中的一个重要特性,我们之前实现了内存数据库,但是由于数据存储在内存中,重启后数据会丢失。为了实现持久化,我们需要将数据存储到磁盘上。
在本章节中,我们将实现 Redis 的 AOF (Append Only File)持久化功能。AOF 是 Redis 的一种持久化机制,它将所有写操作记录到一个文件中,以便在 Redis 重启时可以重新加载数据。
我们先到 redis.conf
文件中添加两个字段:
appendonly yes
appendfilename appendonly.aof
在 redis.conf
文件中添加 appendonly yes
表示启用 AOF 持久化功能,appendfilename "appendonly.aof"
表示 AOF 文件的名称。
实现 NewAofHandler
我们在根目录下创建 aof
文件夹。然后在 aof
文件夹中创建 aof.go
文件,定义 AOF 处理器的结构体和方法。
AofHandler结构体的作用是接收命令并写入AOF文件,将Redis服务器执行的写命令以追加方式持久化到文件中。在服务器重启时从AOF文件恢复数据,通过重新执行AOF文件中记录的命令来重建数据库状态。
在 Redis 中,我们有数据要写入 AOF 文件,我们是等待落盘完成后再返回给客户端,还是说直接返回给客户端,然后在后台异步写入 AOF 文件呢?我们选择第二种方式,这样可以提高性能。
所以在 aof.go
文件中,我们需要定义一个 AofHandler
结构体,它包含一个数据库实例和一个用于写入 AOF 文件的通道。我们还需要定义一个 payload
结构体,用于存储需要写入 AOF 文件的命令和对应的数据库索引。
AOF 处理器的结构体如下:
type CmdLine = [][]byte
type payload struct {
cmdLine CmdLine
dbIndex int
}
// AofHandler handles the Append-Only File (AOF) functionality for Redis.
type AofHandler struct {
db database.Database
aofChan chan *payload
aofFile *os.File
aofFilename string
currentDB int
}
这个结构体包含了以下字段:
db
:数据库实例,用于执行命令。aofChan
:用于接收需要写入 AOF 文件的命令的通道。aofFile
:AOF 文件的文件句柄。aofFilename
:AOF 文件的文件名。currentDB
:当前数据库的索引。
接下来,我们需要实现 AofHandler
的构造函数和初始化方法。
初始化方法主要要做的就是填充好 AofHandler
结构体的字段。
我们在 aof.go
文件中添加以下代码:
// NewAofHandler creates a new AofHandler instance.
func NewAofHandler(db database.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
// TODO: Recover from AOF file
// Open the AOF file for reading and writing
aofFile, err := os.OpenFile(handler.aofFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
return nil, err
}
handler.aofFile = aofFile
// TODO: Make a chan for aof
return handler, nil
}
在这里留下了两个 TODO:
- 从 AOF 文件中恢复数据,也就是把之前存储的 AOF 文件中的数据加载到数据库中
- 创建一个用于写入 AOF 文件的通道
这个留到一会我们再补上。
接下来我们实现一个 AddAof
方法,它将命令行添加到 AOF 处理器中。这个方法会将命令行推送到 aofChan 通道中,以便异步写入 AOF 文件。
要注意的是,AddAof
方法会在 AOF 文件的写入过程中确保通道的存在,并且确保开启了 AOF 功能。
// AddAof adds a command line to the AOF file. It will push the command line to the aofChan channel.
func (h *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
if h.aofChan == nil || !config.Properties.AppendOnly {
h.aofChan = make(chan *payload, 100)
}
h.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
我们需要一个函数用来处理 AOF 的落盘操作,这个函数会从 aofChan
通道中读取命令,并将其写入 AOF 文件。需要源源不断的从 aofChan
通道中读取命令,并将其写入 AOF 文件。
我们在 aof.go
文件中添加以下函数签名,这个留到一会再补全:
// handleAof handles the AOF file writing. It will write the command line to the AOF file.
func (h *AofHandler) handleAof() {
// TODO: Write the command line to the AOF file
}
这里了解了为什么要用 Chan 以及 Chan 的作用后,我们反过来补全一下 NewAofHandler
方法中的 TODO。
我们需要在 NewAofHandler
方法中创建一个用于写入 AOF 文件的通道。
我们定义一个 aofBufferSize
常量,用于设置 AOF 通道的缓冲区大小。我们将其设置为 1 << 16
,也就是 65536 字节。
然后我们在 NewAofHandler
方法中创建一个大小为 aofBufferSize
的通道,并将其赋值给 aofChan
字段。
还得创建一个协程用来运行 handleAof
方法,保证 handleAof
方法能够在后台运行。
const aofBufferSize = 1 << 16
// NewAofHandler creates a new AofHandler instance.
func NewAofHandler(db database.Database) (*AofHandler, error) {
// ...
handler.aofFile = aofFile
handler.aofChan = make(chan *payload, aofBufferSize)
// Start a goroutine to handle the AOF file writing
go func() {
handler.handleAof()
}()
return handler, nil
}
然后我们需要实现一个 LoadAof
方法,用于从 AOF 文件中加载数据。这个方法会读取 AOF 文件中的命令,并将其执行到数据库中。
添加下面的函数签名:
// LoadAof loads commands from the AOF file and executes them on the database.
func (h *AofHandler) LoadAof() {
// TODO: Implement loading commands from AOF file
}
这样我们就可以补全 handleAof
方法了。
// NewAofHandler creates a new AofHandler instance.
func NewAofHandler(db database.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
// Load the AOF file if it exists
handler.LoadAof()
// Open the AOF file for reading and writing
aofFile, err := os.OpenFile(handler.aofFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
return nil, err
}
handler.aofFile = aofFile
// Make a chan for aof
handler.aofChan = make(chan *payload, aofBufferSize)
// Start a goroutine to handle the AOF file writing
go func() {
handler.handleAof()
}()
return handler, nil
}
实现 handleAof
handleAof
方法的作用是从 aofChan
通道中读取命令,并将其写入 AOF 文件。我们需要不断地从通道中读取命令,并将其写入 AOF 文件。
还记得我们声明了一个 currentDB
字段吗?这个字段的作用是记录当前数据库的索引。
如果发生了数据库切换,我们需要将当前数据库的索引更新为新的索引。然后使用 select
命令来切换数据库,并将新的索引赋值给 currentDB
。这样可以确保在处理 AOF 文件时,命令会被正确地执行到当前数据库中。
在这里我们先实现这样的逻辑:
不断从 aofChan
通道中读取命令,判断当前数据库的索引是否发生了变化,如果发生了变化,就执行 SELECT
命令来切换数据库。
我们需要构造如下的格式:
*2
$6
SELECT
$1
$0
func (h *AofHandler) handleAof() {
h.currentDB = 0
for p := range h.aofChan {
// Write the SELECT command if the database index has changed
if p.dbIndex != h.currentDB {
h.currentDB = p.dbIndex
// TODO: Write the SELECT command to the AOF file
}
}
}
我们可以使用之前的 reply
包中的 MakeMultiBulkReply
方法来创建一个 MULTI BULK
回复,然后将其写入 AOF 文件中。
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
return &MultiBulkReply{Args: args}
}
这里接收的参数是一个 [][]byte
的切片,所以我们需要一个方法,将传入的多个 string 转换为 [][]byte
的切片。
在 lib
下创建 utils
文件夹,然后在 utils
文件夹中创建 utils.go
文件,添加以下代码:
// ToCmdLine convert strings to [][]byte
func ToCmdLine(cmd ...string) [][]byte {
args := make([][]byte, len(cmd))
for i, s := range cmd {
args[i] = []byte(s)
}
return args
}
这个功能函数的作用是将传入的多个 string 转换为 [][]byte
的切片。
然后我们在 handleAof
方法中调用 MakeMultiBulkReply
方法,构造一个 MULTI BULK
回复,然后将其写入 AOF 文件中。
// handleAof handles the AOF file writing. It will write the command line to the AOF file.
func (h *AofHandler) handleAof() {
h.currentDB = 0
for p := range h.aofChan {
// Write the SELECT command if the database index has changed
if p.dbIndex != h.currentDB {
h.currentDB = p.dbIndex
// Write the SELECT command to the AOF file
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
_, err := h.aofFile.Write(data)
if err != nil {
logger.Error("AOF write error: " + err.Error())
// Continue to the next command
continue
}
}
// Write the command line to the AOF file
data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := h.aofFile.Write(data)
if err != nil {
logger.Error("AOF write error: " + err.Error())
// Continue to the next command
continue
}
}
}
这里主要是分了两步:
- 如果数据库索引发生了变化,就执行
SELECT
命令来切换数据库。 - 将命令行写入 AOF 文件中。
注意这里我们的 MakeMultiBulkReply
方法返回的是一个 *MultiBulkReply
的指针,所以我们需要调用 ToBytes
方法将其转换为字节数组才能写入 AOF 文件中。
实现记录功能
我们需要追踪数据库执行的过程中发生的写命令,并将其记录到 AOF 文件中。我们可以在 database
包的 Database
结构体中添加一个 aofHandler
字段,用于存储 AOF 处理器的实例。
type Database struct {
dbSet []*DB
// 添加下面的字段
aofHandler *aof.AofHandler
}
然后在 MakeDatabase
方法中,我们需要创建一个 AOF 处理器的实例,并将其赋值给 aofHandler
字段。这时候需要判断 AOF 功能是否开启,如果开启了,就创建一个 AOF 处理器的实例。
// NewDatabase creates a new Database instance
func NewDatabase() *Database {
database := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
database.dbSet = make([]*DB, config.Properties.Databases)
for i := range database.dbSet {
db := MakeDB()
db.index = i
database.dbSet[i] = db
}
// 添加下面的代码
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
}
return database
}
我们在哪一层需要使用 AOF 处理器呢?
回忆我们之前实现的 database
包中的各类方法,拿 strings
包中的方法举例:
在 execSet
方法中,我们需要将 SET
命令记录到 AOF 文件中。我们可以在 execSet
方法中调用 AddAof
方法,将命令行添加到 AOF 处理器中。
// execSet stores the specified key-value pair in the database.
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
// Add the command line to the AOF file
// ...
return reply.MakeOKReply()
}
但是 db 并不持有 AofHandler 的引用,所以我们可以在 db 结构体中放入一个 func
类型的字段,用于执行 AOF 处理器的 AddAof
方法。
type DB struct {
index int
data dict.Dict
// 添加下面的字段
addAof func(CmdLine)
}
然后在 database
包的 NewDatabase
方法中,我们需要对每个 db
进行初始化,对每个 db
创建一个匿名函数,用于执行 AOF 处理器的 AddAof
方法。
这个匿名函数会调用 aof
处理器的 AddAof
方法,并将当前数据库的索引和命令行作为参数传入。
// NewDatabase creates a new Database instance
func NewDatabase() *Database {
// ...
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
// 添加下面的代码
for _, db := range database.dbSet {
db.addAof = func(line CmdLine) {
database.aofHandler.AddAof(db.index, line)
}
}
}
return database
}
这样的话我们在 execSet
方法中就可以直接调用 db.addAof
方法,将命令行添加到 AOF 处理器中。
// execSet stores the specified key-value pair in the database.
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
// 添加下面这一行
db.addAof(args)
return reply.MakeOKReply()
}
但是你注意到了吗,这里存在一个问题,在 aog
初始化的时候,我们会调用 LoadAof
方法来加载 AOF 文件中的数据,加载 AOF 的数据的时候可能会调用 execSet
方法,而 execSet
方法又会调用 db.addAof
方法。
// NewAofHandler creates a new AofHandler instance.
func NewAofHandler(db database.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
// Load the AOF file if it exists
handler.LoadAof()
// ...
}
但是呢,在下面所示的 NewDatabase
方法中,我们先执行 NewAofHandler
方法,这里面会调用 LoadAof
方法,会调用 execSet
方法中的 db.addAof
方法,但是此时 db.addAof
方法还没有被初始化,所以会导致空指针异常。
// NewDatabase creates a new Database instance
func NewDatabase() *Database {
// ...
if config.Properties.AppendOnly {
// 先执行 NewAofHandler 方法,这里面会调用 LoadAof 方法,会调用 execSet 方法中的 db.addAof 方法
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
// 再执行初始化 db 中的 addAof 方法
for _, db := range database.dbSet {
db.addAof = func(line CmdLine) {
database.aofHandler.AddAof(db.index, line)
}
}
}
return database
}
所以我们可以在 db
中初始化 db
的时候,将 addAof
方法初始化为一个空函数,这样就不会导致空指针异常了。
// MakeDB creates a new DB instance
func MakeDB() *DB {
return &DB{
index: 0,
data: dict.MakeSyncDict(),
// 添加下面的代码
addAof: func(line CmdLine) {
// No-op by default,
// can be overridden by the database instance
},
}
}
这样在恢复 AOF 文件中的数据时,就不会出错了。而且,如果没有开启 AOF 功能的话,addAof
方法也不会被调用。后续我们也不需要在每个地方都判断 AOF 功能是否开启了。
接下来我们逐个回到 database
包中的方法中,添加 db.addAof
方法,来记录应当写入 AOF 文件的命令。
注意,这里我们只需要记录写命令,读命令不需要记录到 AOF 文件中。
例如 PING
命令就不需要记录到 AOF 文件中。
我们来看 database/keys.go
文件中的 execDel
方法:
func execDel(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
for i, arg := range args {
keys[i] = string(arg)
}
deleted := db.Removes(keys...)
return reply.MakeIntReply(int64(deleted))
}
我们需要在 execDel
方法中添加 db.addAof
方法,将 DEL
命令记录到 AOF 文件中。在这里 args
是不含有 DEL
命令的,所以我们需要传入命令名称,将两者组装成一个 CmdLine
的切片。
我们到 utils
文件夹中添加一个 ToCmdLineWithName
方法,用于将命令名称和参数转换为 CmdLine
的切片。
// ToCmdLineWithName convert command name and args to [][]byte
func ToCmdLineWithName(name string, args ...[]byte) [][]byte {
cmd := make([][]byte, len(args)+1)
cmd[0] = []byte(name)
for i, s := range args {
cmd[i+1] = s
}
return cmd
}
然后在 execDel
方法中调用 ToCmdLineWithName
方法,将 DEL
命令和参数转换为 CmdLine
的切片,然后传入 db.addAof
方法中。
func execDel(db *DB, args [][]byte) resp.Reply {
// ...
deleted := db.Removes(keys...)
// 添加下面的代码
if deleted > 0 {
db.addAof(utils.ToCmdLineWithName("DEL", args...))
}
return reply.MakeIntReply(int64(deleted))
}
然后是 FLUSHDB
命令:
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
// 添加下面的代码
db.addAof(utils.ToCmdLineWithName("FLUSHDB", args...))
return reply.MakeOKReply()
}
然后是 RENAME
命令:
func execRename(db *DB, args [][]byte) resp.Reply {
// ...
db.Remove(src)
// 添加下面的代码
db.addAof(utils.ToCmdLineWithName("RENAME", args...))
return reply.MakeOKReply()
}
然后是 RENAMENX
命令:
func execRenameNX(db *DB, args [][]byte) resp.Reply {
// ...
db.Remove(src)
// 添加下面的代码
db.addAof(utils.ToCmdLineWithName("RENAMENX", args...))
return reply.MakeIntReply(1)
}
接下来看 database/strings.go
文件中的 execSet
方法:
func execSet(db *DB, args [][]byte) resp.Reply {
// ...
db.PutEntity(key, entity)
db.addAof(utils.ToCmdLineWithName("SET", args...))
return reply.MakeOKReply()
}
然后是 SETNX
命令:
func execSetNX(db *DB, args [][]byte) resp.Reply {
// ...
result := db.PutIfAbsent(key, entity)
// 添加下面的代码
db.addAof(utils.ToCmdLineWithName("SETNX", args...))
return reply.MakeIntReply(int64(result))
}
然后是 GETSET
命令:
func execGetSet(db *DB, args [][]byte) resp.Reply {
// ...
db.PutEntity(key, &database.DataEntity{
Data: value,
})
// 添加下面的代码
db.addAof(utils.ToCmdLineWithName("GETSET", args...))
if !ok {
return reply.MakeNullBulkReply()
}
return reply.MakeBulkReply(entity.Data.([]byte))
}
测试和解决闭包问题
现在我们测试一下 AOF 功能是否正常工作。
启动服务器:
go run main.go
新开一个命令行,执行命令:
printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379
这是一条 SET
命令,设置键值对 key
和 value
。
返回:
+OK
然后我们查看 AOF 文件 (appendonly.aof
) 的内容:
*2
$6
SELECT
$2
15
*3
$3
SET
$3
key
$5
value
好像有点问题,这里 SELECT 的参数是 15。
按道理我们没有选择数据库 15 的命令,这里应该是 0 才对。
问题出在 database
包中的 NewDatabase
方法中:
func NewDatabase() *Database {
// ...
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
for _, db := range database.dbSet {
db.addAof = func(line CmdLine) {
database.aofHandler.AddAof(db.index, line)
}
}
}
return database
}
在这段代码中,我们为每个 db
实例设置了一个匿名函数,并将其赋值给 db.addAof
。这个函数会调用 AOF 处理器的 AddAof
方法,并传入当前数据库的索引(db.index
)和命令行参数。
值得注意的是,这里形成了一个 闭包:匿名函数中引用了外部的变量 db
,而这个变量正是 for
循环中的循环变量。Go 的闭包机制会捕获变量本身(即引用),而不是在函数定义时的值。换句话说,闭包中保存的是对变量 db
的引用,而不是每次循环中 db.index
的快照。
由于 for
循环中的 db
变量在每轮循环中是复用的,所有闭包最终都引用的是同一个 db
。因此,当循环结束后,db
的值可能已经变成了最后一个数据库(例如索引为 15 的那个),从而导致所有闭包中的 db.index
实际都是 15,而不是各自对应的 0、1、2 等值。
此外,因为闭包中引用了循环外部的变量 db
,Go 编译器会通过逃逸分析判断该变量的生命周期超出了当前栈帧的作用范围,因此会将其分配到堆上(即发生逃逸)。这不仅可能引发逻辑错误,也会带来一定的性能开销。
例如下面的一个例子:
func main() {
funcs := []func(){}
for i := 0; i < 3; i++ {
funcs = append(funcs, func() {
fmt.Println(i)
})
}
for _, f := range funcs {
f()
}
}
在这里我们可能期望输出 0、1、2,但是实际上输出的是 3、3、3。 因为闭包捕获的 i 在最后一次循环结束后,i 的值是 3。那么在后面执行闭包的时候,i 的值就是 3。
为了解决这个问题,我们可以创建一个变量来截断闭包的引用。我们可以在 for
循环中创建一个新的变量 sdb
,并将其赋值为当前的 db
,然后在闭包中使用 sdb
变量。
这样即使第一轮循环结束后,sdb
逃逸到堆上,在第二轮也会新建一个 sdb
变量(地址不同),避免闭包问题。
func NewDatabase() *Database {
// ...
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
for _, db := range database.dbSet {
// 修改下面的代码
sdb := db
sdb.addAof = func(line CmdLine) {
database.aofHandler.AddAof(sdb.index, line)
}
}
}
return database
}
接下来删除 appendonly.aof
文件,重新运行服务器,执行 SET
命令,然后查看 AOF 文件的内容:
printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379
*3
$3
SET
$3
key
$5
value
现在 AOF 文件的内容是正确的了。因为这里默认是选择数据库 0 的,所以不会出现 SELECT
命令。
那么我们测试一下 SELECT
命令:
printf "*2\r\n\$6\r\nSELECT\r\n\$1\r\n1\r\n" | nc localhost 6379
返回:
:1
不对!应该返回 OK 才对,原来是我们在 execSelect
中写错了:
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
dbIndex, err := strconv.Atoi(string(args[0]))
if err != nil {
return reply.MakeStandardErrorReply("ERR invalid DB index")
}
if dbIndex < 0 || dbIndex >= len(database.dbSet) {
return reply.MakeStandardErrorReply("ERR DB index out of range")
}
c.SelectDB(dbIndex)
return reply.MakeIntReply(int64(dbIndex))
}
这里我们应该返回 OK
,而不是 :1
。
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
dbIndex, err := strconv.Atoi(string(args[0]))
if err != nil {
return reply.MakeStandardErrorReply("ERR invalid DB index")
}
if dbIndex < 0 || dbIndex >= len(database.dbSet) {
return reply.MakeStandardErrorReply("ERR DB index out of range")
}
c.SelectDB(dbIndex)
return reply.MakeOKReply() // 修改这里
}
再进行测试:
printf "*2\r\n\$6\r\nSELECT\r\n\$1\r\n1\r\n" | nc localhost 6379
返回:
+OK
但是!让人恼火的是,aof
文件中还是没有 SELECT
命令。
这是为什么呢?
注意,我们去看看我们的输出日志:
(base) orangejuice@Chengs-Mac redigo % go run main.go
[INFO][server.go:40] 2025/04/15 20:41:19 bind: 0.0.0.0:6379, start listening...
[INFO][server.go:69] 2025/04/15 20:41:21 accept link
[INFO][handler.go:61] 2025/04/15 20:41:21 connection closed: [::1]:62176
[INFO][server.go:69] 2025/04/15 20:41:22 accept link
[INFO][handler.go:61] 2025/04/15 20:41:22 connection closed: [::1]:62185
我们发现每处理完一次命令,都会打印一条 connection closed
的日志,这说明连接已经关闭了。
探求原因在于,我们使用 printf
命令进行测试,但是这不会进行持久连接,所以每次测试完都会关闭连接。然后会选择新的数据库,这样就会导致后面的 SET
命令还是使用的默认的数据库 0。
我们可以换一个测试工具,例如 redis-cli
,假设你之前安装过了 redis
那么这个命令应该是可以直接使用的。
我们清空 appendonly.aof
文件,然后重启服务器:
go run main.go
然后使用 redis-cli
连接到服务器:
redis-cli -p 6379
然后执行 SELECT
命令,由于它就是一个 redis
客户端,所以我们可以直接执行 SELECT
命令,而不需要使用 RESP 格式输入。
SELECT 1
返回:
OK
然后执行 SET
命令:
SET mykey myvalue
返回:
OK
真实控制台输出:
127.0.0.1:6379[1]> SELECT 1
OK
127.0.0.1:6379[1]> SET mykey myvalue
OK
然后我们查看 appendonly.aof
文件的内容:
*2
$6
SELECT
$1
1
*3
$3
SET
$5
mykey
$7
myvalue
现在 AOF 文件的内容是正确的了。
实现 LoadAof
接下来我们实现 LoadAof
方法,用于从 AOF 文件中加载数据。这个方法会读取 AOF 文件中的命令,并将其执行到数据库中。
我们写入 aof
文件时就使用了 RESP 协议的格式,所以我们可以很好的使用 RESP 协议的解析器来解析 AOF 文件中的命令。
我们创建补充 LoadAof
方法中。
还记得我们之前章节中,实现了一个 parser
包吗?这个包可以用于解析 RESP 协议的命令。我们可以使用这个包来解析 AOF 文件中的命令。
我们需要做一些错误处理,假如无法打开 AOF 文件,或者解析 AOF 文件中的命令失败,我们需要记录错误日志,并继续执行下一个命令。
然后使用 parser
包中的 ParseStream
方法来解析 AOF 文件中的命令。这个方法会返回一个通道,我们可以从这个通道中读取解析后的命令。
我们需要判断解析后的命令是否为 nil
,如果是 nil
,说明解析失败,我们需要记录错误日志,并继续执行下一个命令。
然后我们需要判断解析后的命令是否为 MultiBulkReply
类型,如果是,我们就可以用来执行了。
// LoadAof loads commands from the AOF file and executes them on the database.
func (h *AofHandler) LoadAof() {
// Open the AOF file for reading
aofFile, err := os.Open(h.aofFilename)
if err != nil {
logger.Error("AOF file open error: " + err.Error())
return
}
defer aofFile.Close()
ch := parser.ParseStream(aofFile)
for p := range ch {
if p.Err != nil {
// If the error is EOF or unexpected EOF, break the loop
if p.Err == io.EOF || p.Err == io.ErrUnexpectedEOF {
// End of file
break
}
// Other errors
logger.Error("AOF file parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("AOF file empty payload")
continue
}
// Attempt to parse the payload as a MultiBulkReply
// If it fails, log an error and continue to the next payload
r, ok := p.Data.(*reply.MultiBulkReply)
// TODO
}
}
然后怎么去执行这个命令呢?
我们想到上一章节中我们实现的 database
包中的 Exec
方法,这个方法可以执行 RESP 协议的命令。
我们回顾一下 Exec
方法的实现:
// Exec executes a command on the database
func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {
defer func() {
if err := recover(); err != nil {
logger.Error("Database Exec panic:" + err.(error).Error())
}
}()
cmdName := strings.ToLower(string(args[0]))
if cmdName == "select" {
if len(args) != 2 {
return reply.MakeArgNumErrReply("select")
}
return execSelect(client, d, args[1:])
}
// Get the current database index from the client connection
db := d.dbSet[client.GetDBIndex()]
return db.Exec(client, args)
}
但是调用这个方法我们需要一个 client
对象,而我们在 LoadAof
方法中并没有 client
对象。
在 Exec
中,使用 client
对象的唯一目的是为了获取当前数据库的索引,而不关心 client
对象的其他属性。
所以我们可以构造一个假的 client
对象,只需要在里面填入 selectedDB
字段即可。接下来,我们将创建一个 fakeClient
结构体,并实现 GetDBIndex
方法,以便在 LoadAof
方法中使用它。
// LoadAof loads commands from the AOF file and executes them on the database.
func (h *AofHandler) LoadAof() {
// Open the AOF file for reading
aofFile, err := os.Open(h.aofFilename)
if err != nil {
logger.Error("AOF file open error: " + err.Error())
return
}
defer aofFile.Close()
ch := parser.ParseStream(aofFile)
fakeConn := &connection.Connection{}
for p := range ch {
if p.Err != nil {
// If the error is EOF or unexpected EOF, break the loop
if p.Err == io.EOF || p.Err == io.ErrUnexpectedEOF {
// End of file
break
}
// Other errors
logger.Error("AOF file parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("AOF file empty payload")
continue
}
// Attempt to parse the payload as a MultiBulkReply
// If it fails, log an error and continue to the next payload
r, ok := p.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("AOF file require multi bulk reply")
continue
}
// Execute the command on the database
rep := h.db.Exec(fakeConn, r.Args)
if reply.IsErrReply(rep) {
logger.Error("Execute AOF command error")
}
}
}
在这里我们创建了一个 fakeConn
结构体,并将其传入 Exec
方法中。这样就可以执行 AOF 文件中的命令了。
然后调用 Exec
方法时,我们将 fakeConn
传入,这样就可以获取当前数据库的索引了。
测试
我们先删除之前的 appendonly.aof
文件,然后重启服务器:
go run main.go
然后为了便于测试,我们先写入一些命令到 AOF 文件中:
使用 redis-cli
连接到服务器:
redis-cli -p 6379
然后执行以下命令:
SELECT 1
SET key1 value1
SET key2 value2
GETSET key1 value2
SET key3 value3
SET key4 value4
现在 AOF 文件中应该有以下内容:
*2
$6
SELECT
$1
1
*3
$3
SET
$4
key1
$6
value1
*3
$3
SET
$4
key2
$6
value2
*3
$6
GETSET
$4
key1
$6
value2
*3
$3
SET
$4
key3
$6
value3
*3
$3
SET
$4
key4
$6
value4
然后我们重新启动服务器:
go run main.go
然后使用 redis-cli
连接到服务器:
redis-cli -p 6379
刚才我们把所有的数据都写到 1 号数据库中,所以我们先选择 1 号数据库:
SELECT 1
返回:
OK
然后我们执行 KEYS *
命令,查看 1 号数据库中的所有键:
KEYS *
返回:
1) "key1"
2) "key2"
3) "key3"
4) "key4"
可以看到我们之前写入的数据都已经恢复了。
127.0.0.1:6379[1]> SELECT 1
OK
127.0.0.1:6379[1]> KEYS *
1) "key4"
2) "key1"
3) "key2"
4) "key3"
这里没有按照顺序返回,因为 dict
是无序的。
为了测试服务器切换也是正常的,我们再往 0 号数据库中写入一些数据:
SELECT 0
SET key5 value5
SET key8 value8
然后重启服务器,再连接后执行 KEYS *
命令:
SELECT 0
KEYS *
返回:
127.0.0.1:6379> SET key5 value5
OK
127.0.0.1:6379> KEYS *
1) "key8"
2) "key5"
可以看到我们之前写入的数据都已经恢复了。
重要提示。
我们重启服务器后,最好清空 AOF 数据,因为这里可能存在一个问题。就是上次连接的时候,假设是在数据库 3 上断开的连接,那么下次重启服务器的时候,AOF 文件中会有 SELECT 3
的命令,但是刚启动的时候,数据库的索引是 0,此时如果往数据库 0 中写入数据,我们现在的机制下,我们是没有检测上一次的数据库索引的,所以会导致 AOF 数据被写入到 3 号数据库中。
例如某一次我们的 AOF 文件如下:
*2
$6
SELECT
$1
3
*3
$3
SET
$4
key1
$6
value1
此时,我们选择的数据库是 3,然后我们关闭了服务器。
然后我们启动服务器,这个时候默认选择的数据库是 0,此时我们往数据库 0 中写入数据:
SELECT 0
SET key2 value2
此时 AOF 文件中会有以下内容:
*2
$6
SELECT
$1
3
*3
$3
SET
$4
key1
$6
value1
# 下面是我们刚才写入的
*3
$3
SET
$4
key2
$6
value2
看到了吗,这里没有一个 SELECT 0
的命令,所以我们在写入数据的时候,实际上是写入到了 3 号数据库中,而不是 0 号数据库中。
所以我们在写入数据的时候,最好清空 AOF 文件。
还有些没实现的
我们现在的 AOF 的主要功能已经实现了,但是我们还没有实现 AOF 的重写功能。也就是当 AOF 文件过大的时候,我们需要将其重写为一个新的 AOF 文件。许多重复执行写操作的键,我们可以将其合并为一个命令。
这可以留作后续的功能扩展。
提交
我们现在的 AOF 功能已经实现了,下面我们提交代码:
git add .
git commit -m "feat: add aof persistence"