实现 Redis 协议解析器
本文进度对应的代码仓库:Redis 协议解析器
RESP 协议介绍
Redis 的客户端和服务器之间使用 RESP 协议进行通信。RESP 是 Redis Serialization Protocol 的缩写,即 Redis 序列化协议。
对于我们发送给服务器的指令,服务器会返回一个回复。这个回复可能是状态回复、错误回复、整数回复、多行字符串回复、数组回复等等。
例如一条赋值命令:
SET key value
服务器返回的是一个状态回复:
+OK
而在实际 Redis 工作中,传输的是序列化的内容,上面的命令和回复在传输时是这样的:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
+OK\r\n
你可能现在还不明白这些内容是什么意思,我们接下来会详细介绍 RESP 协议的内容。
数据格式
正常回复
以 +
开头的回复表示状态回复,比如 +OK
。以 \r\n
结尾。
+OK\r\n
表示一个状态回复,内容为 OK
。\r\n
表示回车和换行,是由 Redis 客户端自动添加的,为了方便 Redis 服务器解析。
错误回复
以 -
开头的回复表示错误回复,格式为 -[消息]
。以 \r\n
结尾。
-ERR unknown command 'foobar'\r\n
整数
以 :
开头的回复表示整数回复,格式为 :[数字]
。以 \r\n
结尾。
:1000\r\n
多行字符串
以 $
开头的回复表示多行字符串回复,格式为 $[字符串长度(字节数)]\r\n[字符串]\r\n
。
$6\r\nfoobar\r\n
空字符串:
$0\r\n\r\n
如果我的字符串中含有 \r\n
,使用字符数就可确定哪里才是真正的结束符。
$10\r\nfoo\r\nbar\r\n
数组
以 *
开头的回复表示数组回复,格式为 *[数组长度]\r\n$[字符串长度(字节数)]\r\n[字符串]\r\n$[字符串长度(字节数)]\r\n[字符串]\r\n
。
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
表示一个长度为 2 的数组,包含两个字符串 foo
和 bar
。
本文要实现的就是对 RESP 协议的解析。
实现回复功能
接口定义
首先我们要定义一个接口用于处理连接。
在 interface
中创建一个 resp
包,然后创建 conn.go
文件。
type Connection interface {
Write([]byte) error // 写入数据
GetDBIndex() int // 获取当前连接的数据库索引
SelectDB(int) // 选择数据库
}
然后创建一个接口 Reply
,用于处理回复。
Redis 的回复有多种类型,我们可以使用接口来处理。上面我们说到了有四种回复类型,分别是状态回复、错误回复、整数回复和多行字符串回复,对于不同类型我们的处理方式也不同,因此使用接口来处理。这样我们使用的时候,只需要调用接口的方法即可。
type Reply interface {
ToBytes() []byte // 将回复转换为字节切片
}
实现固定回复的 Reply 接口
在 Redis 中,每个类型都存在固定回复,例如当我们完成一个操作时,返回的是 OK
,这是一个固定回复。我们可以实现一个固定回复的结构体,然后实现 Reply
接口。又比如,Redis 可以接收 PING 命令,返回的是 PONG
,这也是一个固定回复。
那么我们在根目录下创建一个 resp
文件夹,然后创建一个 reply
包,然后创建 consts.go
文件。
以 PONG 回复为例,我们创建一个 Pong
结构体,实现 Reply
接口。(因为我们要实现接口,所以就必须有个结构体,然后实现接口的方法,就算结构体是空的,也可以)
// PongReply 在客户端发送 PING 命令时的回复是固定的 PONG
type PongReply struct{}
// ToBytes 将回复转换为字节数组
func (r *PongReply) ToBytes() []byte {
return []byte("+PONG\r\n")
}
// MakePongReply 创建一个 PONG 回复
// 这里使用了工厂模式,将 pongReply 的构造函数隐藏起来
func MakePongReply() *PongReply {
return &PongReply{}
}
这里实现了一个 PongReply
结构体,实现了 Reply
接口的 ToBytes
方法,返回的是 +PONG\r\n
。
这里也使用了工厂模式,将 PongReply
的构造函数隐藏起来,这样我们在使用的时候,只需要调用 MakePongReply
方法即可。
同理地,实现下面的几个固定回复。
// OKReply 在客户端发送 SET 命令时的回复是固定的 OK
type OKReply struct{}
func (r *OKReply) ToBytes() []byte {
return []byte("+OK\r\n")
}
func MakeOKReply() *OKReply {
return &OKReply{}
}
// NullBulkReply 空的 Bulk 回复(字符串 nil)
type NullBulkReply struct{}
func (r *EmptyBulkReply) ToBytes() []byte {
return []byte("$-1\r\n") // -1,表示 nil 值
}
func MakeNullBulkReply() *NullBulkReply {
return &NullBulkReply{}
}
// EmptyBulkReply 空的 Bulk 回复(空字符串)
type EmptyBulkReply struct{}
func (r *EmptyBulkReply) ToBytes() []byte {
return []byte("$0\r\n\r\n") // 0,表示空字符串
}
func MakeEmptyBulkReply() *EmptyBulkReply {
return &EmptyBulkReply{}
}
// EmptyMultiBulkReply 空的 MultiBulk 回复(空数组)
type EmptyMultiBulkReply struct{}
func (r *EmptyMultiBulkReply) ToBytes() []byte {
return []byte("*0\r\n")
}
func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {
return &EmptyMultiBulkReply{}
}
// NoReply 无回复
type NoReply struct{}
func (r *NoReply) ToBytes() []byte {
return []byte("")
}
func MakeNoReply() *NoReply {
return &NoReply{}
}
主要需要实现的是:
OKReply
:在客户端发送SET
命令时的回复是固定的OK
。NullBulkReply
:空的 Bulk 回复,Bulk 是多行字符串,-1
表示nil
值。比如GET
命令,如果 key 不存在,就会返回nil
。EmptyBulkReply
:空的 Bulk 回复,0
表示空字符串。比如SET
命令,如果 key 不存在,就会返回空字符串。EmptyMultiBulkReply
:空的 MultiBulk 回复,0
表示空数组。比如LRANGE
命令,如果 key 不存在,就会返回空数组。NoReply
:无回复。
在 Redis 中,Bulk 是多行字符串,MultiBulk 是数组。
异常回复
首先在 reply
包中创建一个 reply.go
文件。
在这里我们定义一个 ErrorReply
接口,这个接口继承了 Reply
接口和系统的 error
接口。
为什么异常回复需要单独定义一个接口而上面的 Const 固定回复不需要呢?因为异常回复是需要返回错误信息的,而固定回复是固定的,不需要返回错误信息。
// ErrorReply 错误回复,实现了 Reply 的 ToBytes 方法,也实现了系统的 error 接口
// 这里使用了接口组合,将 error 接口和 Reply 接口组合在一起
type ErrorReply interface {
Error() string
ToBytes() []byte
}
接下来创建 error.go
文件,实现 ErrorReply
接口。
例如,当用户输入的命令参数数量错误时,我们可以返回一个 ArgNumErrReply
错误回复。
这里我们可以将用户输入的命令传入到结构体中,提示用户使用的哪个命令有问题。
// ArgNumErrReply 参数数量错误回复
type ArgNumErrReply struct {
Cmd string // 提示用户使用的哪个命令有问题
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return &ArgNumErrReply{Cmd: cmd}
}
接下来实现其他的异常回复。主要有:
UnknownReply
:未知错误回复。当我们不知道错误是什么时,可以返回这个回复。SyntaxErrReply
:语法错误回复。当用户输入的命令有语法错误时,可以返回这个回复。WrongTypeErrReply
:类型错误回复。当用户对一个错误的数据类型执行操作时,可以返回这个回复。ProtocolErrReply
:协议错误回复。当用户输入的命令有协议错误时,例如对于数组需要*
开头,对于字符串需要$
开头,而用户没有遵守这个规则时,可以返回这个回复。
// UnknownReply 未知错误回复
type UnknownReply struct{}
func (r *UnknownReply) Error() string {
return "Unknown Error"
}
func (r *UnknownReply) ToBytes() []byte {
return []byte("-ERR unknown\r\n")
}
func MakeUnknownReply() *UnknownReply {
return &UnknownReply{}
}
// SyntaxErrReply 语法错误回复
type SyntaxErrReply struct{}
func (r *SyntaxErrReply) Error() string {
return "ERR syntax error"
}
func (r *SyntaxErrReply) ToBytes() []byte {
return []byte("-ERR syntax error\r\n")
}
func MakeSyntaxErrReply() *SyntaxErrReply {
return &SyntaxErrReply{}
}
// WrongTypeErrReply 类型错误回复
type WrongTypeErrReply struct{}
func (r *WrongTypeErrReply) Error() string {
return "WRONG TYPE Operation against a key holding the wrong kind of value"
}
func (r *WrongTypeErrReply) ToBytes() []byte {
return []byte("-WRONG TYPE Operation against a key holding the wrong kind of value\r\n")
}
func MakeWrongTypeErrReply() *WrongTypeErrReply {
return &WrongTypeErrReply{}
}
// ProtocolErrReply 协议错误回复
type ProtocolErrReply struct {
Msg string
}
func (r *ProtocolErrReply) Error() string {
return "PROTOCOL ERROR: " + r.Msg
}
func (r *ProtocolErrReply) ToBytes() []byte {
return []byte("-PROTOCOL ERROR: " + r.Msg + "\r\n")
}
func MakeProtocolErrReply(msg string) *ProtocolErrReply {
return &ProtocolErrReply{Msg: msg}
}
自定义回复
在 Redis 中,我们可以自定义回复,比如我们可以返回一个字符串,一个整数,一个数组等等。
首先在 reply.go
文件中定义一个 BulkReply
结构体,用于处理多行字符串回复。
我们结构体中存储的是我们想要返回的字符串,然后实现 Reply
接口的 ToBytes
方法,将字符串转换为符合 RESP 协议的字节切片。
// BulkReply 字符串回复
type BulkReply struct {
Arg []byte // 回复的内容,此时是不符合 RESP 协议的
}
接下来实现 BulkReply
结构体的 ToBytes
方法。
func (r *BulkReply) ToBytes() []byte {
// 如果字符串为空,返回空字符串
if len(r.Arg) == 0 {
return nullBUlkReplyBytes
}
// 将 BulkReply 转换为符合 RESP 协议的字节数组
return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}
func MakeBulkReply(arg []byte) *BulkReply {
return &BulkReply{Arg: arg}
}
接下来实现字符串数组回复。
主要思路是遍历数组,然后将数组中的每个字符串转换为 RESP 协议的字节切片。然后将这些字节切片拼接起来,返回。
// MultiBulkReply 多个字符串回复
type MultiBulkReply struct {
Args [][]byte
}
func (r *MultiBulkReply) ToBytes() []byte {
argLen := len(r.Args)
var buf bytes.Buffer
buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
for _, arg := range r.Args {
if arg == nil {
// *-1\r\n\r\n 表示空数组
buf.WriteString(string(nullBUlkReplyBytes) + CRLF)
} else {
// *3\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nhello\r\n
buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
}
}
// 返回的内容是一个字节切片
return buf.Bytes()
}
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
return &MultiBulkReply{Args: args}
}
例如,我们有一个字符串数组 ["foo", "bar", "hello"]
:
经过上述的处理,我们可以得到 RESP 协议的字节切片:
*3\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nhello\r\n
接下来分别实现:
StandardErrorReply
:标准错误回复。当我们需要返回一个错误信息时,可以使用这个回复。IntReply
:整数回复。当我们需要返回一个整数时,可以使用这个回复。StatusReply
:状态回复。当我们需要返回一个状态时,可以使用这个回复。
// StandardErrorReply 状态回复(通用错误回复)
type StandardErrorReply struct {
Status string
}
func (r *StandardErrorReply) ToBytes() []byte {
return []byte("-" + r.Status + CRLF)
}
func MakeStandardErrorReply(status string) *StandardErrorReply {
return &StandardErrorReply{Status: status}
}
// IntReply 整数回复
type IntReply struct {
Code int64
}
// ToBytes marshal redis.Reply
func (r *IntReply) ToBytes() []byte {
return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}
// MakeIntReply creates int reply
func MakeIntReply(code int64) *IntReply {
return &IntReply{
Code: code,
}
}
// StatusReply 状态回复
type StatusReply struct {
Status string
}
// MakeStatusReply creates StatusReply
func MakeStatusReply(status string) *StatusReply {
return &StatusReply{
Status: status,
}
}
另外我们实现一个函数,IsErrReply
,用于判断是否是错误回复。
func IsErrReply(reply resp.Reply) bool {
return reply.ToBytes()[0] == '-'
}
在这里我们规定,如果回复的第一个字符是 -
,那么就是错误回复。这个函数留到后面使用。
到目前为止,我们就实现了固定回复、异常回复、自定义回复。便于我们后续开发 Redis 服务器的时候,可以直接调用这些回复。
解析客户端请求
举例认识解析流程
为了能更好的理解 RESP 协议的解析,我们举三个例子:
解析数组
例如对于 SET
命令:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
对应的是:SET key value
。
我们的解析过程如下:
- 首先读取
*3\r\n
,表示有 3 个参数。 - 然后读取
$3\r\n
,表示第一个参数的长度为 3。 - 然后读取
SET\r\n
,表示第一个参数是SET
。 - 然后读取
$3\r\n
,表示第二个参数的长度为 3。 - 然后读取
key\r\n
,表示第二个参数是key
。 - 然后读取
$5\r\n
,表示第三个参数的长度为 5。 - 然后读取
value\r\n
,表示第三个参数是value
。
解析多行字符串
对于 $11\r\nhello world\r\n
,表示一个长度为 11 的字符串,内容为 hello world
。
- 首先读取
$11\r\n
,表示字符串的长度为 11。 - 然后读取
hello world\r\n
,表示字符串的内容为hello world
。
解析整数
对于 :1000\r\n
,表示一个整数 1000。
- 直接读取
:1000\r\n
,表示整数为 1000。
初始化结构
在 resp
包中创建一个新的包,parser
,用于解析客户端请求。
首先我们创建一个结构用于存储解析后的结果。
type Payload struct {
Data resp.Reply // 客户端发给服务端的和服务端发给客户端的数据使用的是一个结构,因此也能用 Reply 接口
Err error
}
注意这里我们使用了 Reply
接口,因为客户端发给服务端的数据和服务端发给客户端的数据使用的是一个结构,因此也能用 Reply
接口。
接着使用一个 readState
结构体,用于存储解析的状态。
type readState struct {
readingMultiLine bool // 是否正在读取多行数据
expectedArgsCount int // 期望的参数数量
msgType byte // 消息类型
args [][]byte // 参数
bulkLen int64 // Bulk 回复的长度
}
主要需要记录的是:
readingMultiLine
:是否正在读取多行数据。用于处理数组回复*
和多行字符串回复$
。expectedArgsCount
:期望的参数数量。用于处理数组回复*
,读取元素的数量。msgType
:消息类型。用于处理不同类型的回复。args
:已经读取的参数列表,每个参数都是一个参数数组。bulkLen
:Bulk 回复的长度。表示接下来要读取的 Bulk 回复的长度。
我们需要判断是否完成解析,因此我们为 readState 结构体添加一个方法 isDone
。当期望的参数数量大于 0 且已经读取的参数数量等于期望的参数数量时,表示解析完成。
func (r *readState) isDone() bool {
return r.expectedArgsCount > 0 && len(r.args) == r.expectedArgsCount
}
创建解析函数
Redis 中,可能会有大量的命令同时到达,Redis 要求高性能,因此我们要使用异步解析来处理这些命令。并且我们可以采用流式解析,这样就不必等待所有数据都到达再解析。可以大大提高解析效率。
我们创建异步解析流式消息的函数 ParseStream
。
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parseIt(reader, ch)
return ch
}
对于每个连接,我们都会使用一个新的协程来解析客户端请求。这样就可以实现异步解析。
接下来我们就需要实现 parseIt
函数。
func parseIt(reader io.Reader, ch chan<- *Payload) {
defer func() {
if err := recover(); err != nil {
// 打印调用栈信息
logger.Error(string(debug.Stack()))
}
}()
// ...
}
这个函数主要接收的是一个 io.Reader
接口和一个 chan<- *Payload
通道。通道可以用于异步发送解析后的结果。
我们首先注册一个 defer
函数,用于捕获异常。如果出现异常,我们可以打印调用栈信息。
接下来对于 reader
中的内容,我们需要实现一个读取行的函数 readLine
。这个函数的作用是读取一行数据,直到遇到 \r\n
为止。
函数需要接收一个 bufio.Reader
和一个 *readState
结构体,返回读取的行数据、是否出现错误和错误信息。主要运作流程如下:
-
如果
bulkLen
为 0,表示读取普通的行,直接读取即可。读取直到遇到\r\n
为止。如果读取的行长度为 0 或者最后第二个字符不是\r
,表示不符合 RESP 协议格式,返回错误。 -
如果
bulkLen
不为 0,表示读取 Bulk 回复。
我们回忆一下,Bulk 回复的格式为 $[字符串长度(字节数)]\r\n[字符串]\r\n
。我们首先读取 $
后面的数字,表示字符串的长度。然后我们需要读取 bulkLen
个字节。读取直到读取的字节长度等于 bulkLen
为止。如果读取的行长度为 0 或者最后两个字符不是 \r\n
,表示不符合 RESP 协议格式,返回错误。
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var line []byte
var err error
// 读取普通的行
if state.bulkLen == 0 {
line, err = bufReader.ReadBytes('\n')
if err != nil {
// 发生错误
return nil, true, err
}
if len(line) == 0 || line[len(line)-2] != '\r' {
// 不符合 RESP 协议格式
return nil, false, errors.New("invalid line terminator")
}
} else {
// 读取 Bulk 回复
line = make([]byte, state.bulkLen+2) // 2 是 \r\n 的长度
_, err = io.ReadFull(bufReader, line)
if err != nil {
// 发生错误
return nil, true, err
}
if len(line) == 0 || line[len(line)-2] != '\r' || line[len(line)-1] != '\n' {
// 不符合 RESP 协议格式
return nil, false, errors.New("invalid line terminator")
}
state.bulkLen = 0
}
return line, false, nil
}
接下来回到 parseIt
函数中,我们使用 readLine
函数读取一行数据。如果有错误返回,就发送错误回复。
bufReader := bufio.NewReader(reader) // 读取缓冲区
var state readState // 解析器的状态
var err error
var msg []byte
// 读取数据
for {
var ioErr bool // 是否是 IO 错误
msg, ioErr, err = readLine(bufReader, &state)
if err != nil {
// 如果是 IO 错误,关闭通道,退出循环
if ioErr {
ch <- &Payload{Err: err}
close(ch)
return
}
ch <- &Payload{Err: err}
state = readState{} // 重置状态
continue // 继续循环,读取下一行
}
// ...
}
如果不存在错误,我们需要进行下一步的解析。这里有两种情况:
readState.readingMultiLine
为false
(默认),表示我们刚开始读取消息,此时需要判断消息的类型,然后为readState
结构体中其他字段赋值,便于后续处理。readState.readingMultiLine
为true
,表示我们正在读取多行数据,此时需要根据消息的类型,读取多行数据。
在 readState.readingMultiLine
为 false
的情况下,有三种情况。
- 如果消息的第一个字符是
*
,表示数组回复。我们需要先读取消息的头部,解析出数组的长度。然后根据数组的长度,读取数组中的每个元素。 - 如果消息的第一个字符是
$
,表示多行字符串回复。我们需要读取多行字符串的长度,然后读取多行字符串。 - 如果不是上面两种,那么就说明是单行消息,直接解析完返回即可。
if !state.readingMultiLine {
// 多条批量回复
if msg[0] == '*' {
// 解析头部,获取期望的参数数量
err = parseMultiBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("Protocol error" + string(msg))}
state = readState{} // 重置状态
continue // 继续循环,读取下一行
}
// 需要的参数数量为 0,直接返回
if state.expectedArgsCount == 0 {
ch <- &Payload{Data: &reply.EmptyMultiBulkReply{}}
state = readState{} // 重置状态
continue // 继续循环,读取下一行
}
} else if msg[0] == '$' {
// Bulk 回复
err = parseBulkHeader(msg, &state) // 解析 Bulk 回复的头部,获取 Bulk 回复的长度
if err != nil {
ch <- &Payload{Err: errors.New("Protocol error" + string(msg))}
state = readState{} // 重置状态
continue // 继续循环,读取下一行
}
if state.bulkLen == -1 {
// Bulk 回复的长度为 0,直接返回
ch <- &Payload{Data: &reply.NullBulkReply{}}
state = readState{} // 重置状态
continue // 继续循环,读取下一行
}
} else {
// 单行回复
result, err := parseSingleLineReply(msg)
ch <- &Payload{Data: result, Err: err}
state = readState{} // 本条消息已结束,重置状态
continue // 继续循环,读取下一行
}
}
parseMultiBulkHeader
函数
接下来我们需要实现这个 parseMultiBulkHeader
函数,用于解析数组回复的头部。
具体格式为 *<number>\r\n
,<number>
表示数组的长度。
在 Redis 中,一个数组的格式往往是多条批量回复:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
因此我们需要解析出数组的长度,然后根据这个长度,为 readState 结构体中的 msgType
、expectedArgsCount
和 readingMultiLine
和 args
赋值。实现如下:
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// 多行读取的
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = int(expectedLine)
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
主要的工作流程是:
- 首先我们需要将
msg
转换为字符串,然后解析出数组的长度。 - 如果数组的长度为 0,表示没有参数,直接返回。
- 如果数组的长度大于 0,表示有参数,我们需要为
readState
结构体中的msgType
、expectedArgsCount
和readingMultiLine
和args
赋值。分别赋值为msg[0]
、expectedLine
、true
、0
。
parseBulkHeader
函数
然后再实现 parseBulkHeader
函数,用于解析 Bulk 回复的头部。和上面的 parseMultiBulkHeader
函数类似:
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
parseSingleLineReply
函数
对于单行的消息,主要有三种类型:
- 状态回复,以
+
开头。 - 错误回复,以
-
开头。 - 整数回复,以
:
开头。
我们只需要根据类型,使用之前创建的 reply
包中的函数构造回复即可。见 实现回复功能。
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
str := strings.TrimSuffix(string(msg), "\r\n")
var result resp.Reply
switch msg[0] {
case '+': // status reply
result = reply.MakeStatusReply(str[1:])
case '-': // err reply
result = reply.MakeStandardErrorReply(str[1:])
case ':': // int reply
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error: " + string(msg))
}
result = reply.MakeIntReply(val)
}
return result, nil
}
解析多行数据体
然后让我们回到 parseIt
函数中,我们需要处理 readState.readingMultiLine
为 true
的情况。这表示我们已经读取了消息的头部,接下来需要读取消息的内容。
if !state.readingMultiLine {
// ...
} else {
err = readBody(msg, &state)
if err != nil {
ch <- &Payload{
Err: errors.New("protocol error: " + string(msg)),
}
state = readState{} // reset state
continue
}
// 如果满足 isDone 条件,表示解析完成
// 创建一个回复,发送给客户端
if state.isDone() {
var result resp.Reply
if state.msgType == '*' {
result = reply.MakeMultiBulkReply(state.args)
} else if state.msgType == '$' {
result = reply.MakeBulkReply(state.args[0])
}
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
}
}
我们创建一个 readBody
函数,用于读取消息的内容。然后在这个函数中维护 readState
结构体,更新解析的状态。
函数的主要工作流程如下:
- 去除消息的末尾的
\r\n
。 - 检查消息的开头是不是
$
,如果是,表示是 Bulk 回复中的长度指示部分(例如$3\r\n
),我们需要解析出 Bulk 回复的长度。 - 如果不是
$
,表示是 Bulk 回复中的内容部分,我们需要将内容添加到readState
结构体中的args
中。
func readBody(msg []byte, state *readState) error {
line := msg[0 : len(msg)-2]
var err error
if line[0] == '$' {
// 赋值,表示下一行中的 Bulk 回复的长度
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen <= 0 { // null bulk in multi bulks
state.args = append(state.args, []byte{})
state.bulkLen = 0
}
} else {
state.args = append(state.args, line)
}
return nil
}
到目前为止,我们就实现了解析客户端请求的功能。
实现 Connection
在上一个章节中,我们实现了一个 TCP 回显服务器,但是这个服务器只能回显客户端发送的数据,我们需要实现一个 Redis 服务器,这个服务器可以接收客户端发送的 Redis 命令,并且返回相应的结果。
在 TCP 和 Redis 的交互中,我们需要实现一个 Connection
结构体,用于处理客户端和服务器之间的连接。
Connection 要有下面的责任:
- 管理客户端链接的生命周期。
- 处理并发写入的同步。
- 支持多数据库切换。
- 确保响应的及时和可靠。
我们在 resp
下创建一个新的包 connection
,在 connection
包中创建一个 connection.go
文件。
Connection 结构体
首先我们创建一个 Connection
结构体,用于管理客户端和服务器之间的连接。
// Connection 表示客户端和服务端的连接
type Connection struct {
conn net.Conn // 底层的网络连接
waitingReply wait.Wait // 等待完成响应的同步器
mu sync.Mutex // 发送响应时的互斥锁
selectedDB int // 选择的数据库的编号
}
conn
:底层的网络连接。这里的网络连接区别于之前实现的 TCP 服务器,TCP 服务器(在server.go
)负责接受连接,然后将连接传递给处理器(Handler,我们接下来会实现),处理器再将原始的net.Conn
封装成Connection
对象进行后续的处理。waitingReply
:等待完成响应的同步器。我们需要等待客户端的请求处理完成后,再返回响应。用这个字段可以保证在连接关闭前,所有的响应都发送完成,防止数据丢失,保证数据的完整性。mu
:发送响应时的互斥锁。因为我们的服务器是并发的,可能会有多个协程同时写入数据,我们需要使用互斥锁来保证数据的一致性。selectDB
:选择的数据库的编号。Redis 支持多个数据库,我们需要记录当前选择的数据库编号。
创建 Connection
我们创建一个 NewConnection
函数,用于创建一个 Connection
对象。
具体是为了创建并初始化一个新的 Connection 实例,接收一个 net.Conn
对象,然后返将其封装成 Connection
对象。
func NewConnection(conn net.Conn) *Connection {
return &Connection{
conn: conn,
}
}
获取客户端的远程地址
我们创建一个 RemoteAddr
方法,用于获取客户端的远程地址。
func (c *Connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
这个函数主要用于获取客户端的远程地址,方便我们在日志中打印客户端的地址,用于调试。
Close 方法
我们创建一个 Close
方法,用于关闭连接。
func (c *Connection) Close() error {
c.waitingReply.WaitWithTimeout(10 * time.Second)
_ = c.conn.Close()
return nil
}
和上一章节中实现 TCP 的关闭类似,我们都使用带超时的等待来安全地关闭与客户端的连接。
Write 方法
我们创建一个 Write
方法,用于向客户端写入数据。需要使用互斥锁保证数据的一致性。使用 waitingReply
同步器保证数据的完整性。
func (c *Connection) Write(b []byte) error {
if len(b) == 0 {
return nil
}
c.mu.Lock()
c.waitingReply.Add(1)
defer func() {
c.waitingReply.Done()
c.mu.Unlock()
}()
_, err := c.conn.Write(b)
return err
}
若写入的数据长度为 0,直接返回。然后使用互斥锁加锁,等待完成响应的同步器加 1。然后写入数据。注册一个 defer
函数,用于在函数退出时,减少等待完成响应的同步器。
GetDBIndex 方法和 SelectDB 方法
我们创建一个 GetDBIndex
方法,用于获取当前选择的数据库的编号。
func (c *Connection) GetDBIndex() int {
return c.selectedDB
}
为什么不直接暴露 selectedDB
字段呢?这里一个面向对象的设计原则是:封装。这有利于我们后续的扩展和维护。
接下来我们创建一个 SelectDB
方法,用于选择数据库。
func (c *Connection) SelectDB(db int) {
c.selectedDB = db
}
创建 database 接口和实现
在 interface
包中创建一个 database
包,用于定义数据库接口。
// CmdLine 是 [][]byte 类型的别名,方便使用
type CmdLine = [][]byte
// Database 是数据库接口,定义了数据库的基本操作
type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
AfterClientClose(c resp.Connection)
Close()
}
// DataEntity 将数据封装为 DataEntity 类型
type DataEntity struct {
Data interface{}
}
CmdLine
用于表示 Redis 命令行,使用 [][]byte 是因为 Redis 协议中每个参数都是二进制安全的,例如: SET key value
会被表示为 [][]byte{[]byte("SET"), []byte("key"), []byte("value")}
Exec
方法用于执行 Redis 命令。接受参数为客户端连接和命令行,返回值为响应。AfterClientClose
方法用于在客户端关闭连接后的处理。Close
方法用于关闭数据库。
然后创建了一个 DataEntity
结构体,用于封装数据。因为 interface{}
类型可以存储任意类型的数据,所以我们可以将任意类型的数据封装为 DataEntity
类型。
在这里,我们先不实现具体的数据库,我们先实现一个简单的回显数据库(即客户端发送什么,服务器就返回什么),用于测试。
根目录创建 database
包,然后在 database
包中创建一个 echo_database.go
文件。
package database
type EchoDatabase struct {
}
func NewEchoDatabase() *EchoDatabase {
return &EchoDatabase{}
}
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
// 按照原样返回
return reply.MakeMultiBulkReply(args)
}
func (e EchoDatabase) AfterClientClose(c resp.Connection) {
logger.Info("EchoDatabase AfterClientClose")
}
func (e EchoDatabase) Close() {
logger.Info("EchoDatabase Close")
}
Handler
处理去需要管理连接,优雅地关闭连接,处理客户端请求,返回响应等等。
需要调用之前实现的 ParserStream
函数,用于解析客户端请求。将命令转发给数据库执行,然后将结果返回给客户端。
处理器的核心是 RespHandler 结构体,它需要维护了三个重要的状态:
- 活跃连接表:使用 sync.Map 存储所有当前活跃的客户端连接
- 数据库实例:通过接口引用实际的数据存储层
- 关闭标志:用原子布尔值标记服务是否正在关闭
var (
unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)
// RespHandler implements tcp.Handler and serves as a redis handler
type RespHandler struct {
activeConn sync.Map // *client -> placeholder
db databaseface.Database
closing atomic.Boolean // refusing new client and new request
}
// MakeHandler creates a RespHandler instance
func MakeHandler() *RespHandler {
var db databaseface.Database
db = database.NewEchoDatabase()
return &RespHandler{
db: db,
}
}
当新的客户端连接到来时,处理器需要:
- 先检查服务是否正在关闭
- 将原始的 TCP 连接包装成 Redis 客户端连接
- 把这个连接存入活跃连接表
- 开始处理这个连接上的命令流
// Handle receives and executes redis commands
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
}
client := connection.NewConnection(conn)
h.activeConn.Store(client, 1)
ch := parser.ParseStream(conn)
for payload := range ch {
if payload.Err != nil {
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
// protocol err
errReply := reply.MakeStandardErrorReply(payload.Err.Error())
err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
if payload.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
result := h.db.Exec(client, r.Args)
if result != nil {
_ = client.Write(result.ToBytes())
} else {
_ = client.Write(unknownErrReplyBytes)
}
}
}
func (h *RespHandler) closeClient(client *connection.Connection) {
_ = client.Close()
h.db.AfterClientClose(client)
h.activeConn.Delete(client)
}
// Close stops handler
func (h *RespHandler) Close() error {
logger.Info("handler shutting down...")
h.closing.Set(true)
// TODO: concurrent wait
h.activeConn.Range(func(key interface{}, val interface{}) bool {
client := key.(*connection.Connection)
_ = client.Close()
return true
})
h.db.Close()
return nil
}
处理器会不断从连接中读取命令:
- 使用解析器从连接中提取命令
- 检查命令是否合法(包括连接断开、协议错误等)
- 将合法的命令转发给数据库执行
- 把执行结果返回给客户端
如果遇到错误:
- 连接断开:清理连接资源
- 协议错误:向客户端发送错误信息
- 命令执行失败:返回错误响应
当需要关闭服务时:
- 设置关闭标志,拒绝新连接
- 遍历所有活跃连接,关闭它们
- 关闭数据库,完成资源清理
在 handler.go
的 Close 方法中,当前的实现可能存在一个问题:
当服务关闭时,它会遍历所有活跃连接并关闭它们,但是没有等待所有正在处理的命令完成 ,因此可能有些连接正在执行耗时的操作,直接关闭可能导致数据丢失或不一致。所以这里需要改进,留下了一个 TODO。我们可以添加一个并发等待机制,等待所有连接的命令执行完成后再关闭连接。
修改 main.go
在 main.go
中,我们需要修改 main
函数,使用新的 Handler
。
err := tcp.ListenAndServeWithSignal(
&tcp.Config{
Address: fmt.Sprintf("%s:%d",
config.Properties.Bind,
config.Properties.Port),
},
EchoHandler.MakeHandler()) // - 删除这一行
handler.MakeHandler()) // + 添加这一行
if err != nil {
logger.Error(err)
}
测试
执行 go run main.go
启动服务。
创建一个新的终端,执行:
printf "*1\r\n\$4\r\nPING\r\n" | nc localhost 6379
这行命令的含义是:发送一个 PING
命令给 Redis 服务器。
注意这里我们在 $
前面加了一个 \
,这是因为 $
会被当做 shell 中的变量标记符,我们需要转义。
如果一切正常,你会看到服务器返回:
*1
$4
PING
或者我们在 下面的 return
语句前添加断点。
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
return reply.MakeMultiBulkReply(args) // + 添加断点
}
然后添加一个 Go 语言的调试配置,点击调试按钮。
下面的输出表示开始调试。
再次使用 nc
发送 PING
命令,你会看到程序停在了断点处。
printf "*3\r\n\$3\r\nSET\r\n\$3\r\nkey\r\n\$5\r\nvalue\r\n" | nc localhost 6379
这里我们发送了一个 SET key value
命令。
可以在调试控制台中观察变量 args
的值。
我们点开 “视图” 查看变量的值。
分别查看三个参数的值,可以看到参数的值分别是 SET
、key
和 value
。
这就表示我们成功地解析了客户端的请求。
到目前为止,我们已经实现了对 RESP 协议的解析,可以接收客户端的请求,并且返回响应。
下一步,我们将实现 Redis 的数据存储功能。
提交到 GitHub
git add .
git commit -m "feat: parse RESP protocol"
git push