这篇文章主要介绍“GO怎么实现Redis的AOF持久化”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“GO怎么实现Redis的AOF持久化”文章能帮助大家解决问题。
GO实现Redis的AOF持久化
将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据
本文涉及以下文件:redis.conf:配置文件
aof:实现aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int } func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } } func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer file.Close() ch := parser.ParseStream(file) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } }
AofHandler:1.从管道中接收数据 2.写入AOF文件
AddAof:用户的指令包装成payload放入管道
handleAof:将管道中的payload写入磁盘
LoadAof:重启Redis后加载aof文件
database/database.go
type Database struct { dbSet []*DB aofHandler *aof.AofHandler } func NewDatabase() *Database { mdb := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } mdb.dbSet = make([]*DB, config.Properties.Databases) for i := range mdb.dbSet { singleDB := makeDB() singleDB.index = i mdb.dbSet[i] = singleDB } if config.Properties.AppendOnly { aofHandler, err := aof.NewAOFHandler(mdb) if err != nil { panic(err) } mdb.aofHandler = aofHandler for _, db := range mdb.dbSet { singleDB := db singleDB.addAof = func(line CmdLine) { mdb.aofHandler.AddAof(singleDB.index, line) } } } return mdb }
将AOF加入到database里
使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db
database/db.go
type DB struct { index int data dict.Dict addAof func(CmdLine) } func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), addAof: func(line CmdLine) {}, } return db }
由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { ...... if deleted > 0 { db.addAof(utils.ToCmdLine2("del", args...)) } return reply.MakeIntReply(int64(deleted)) } func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() db.addAof(utils.ToCmdLine2("flushdb", args...)) return &reply.OkReply{} } func execRename(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("rename", args...)) return &reply.OkReply{} } func execRenameNx(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("renamenx", args...)) return reply.MakeIntReply(1) }
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("set", args...)) return &reply.OkReply{} } func execSetNX(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("setnx", args...)) return reply.MakeIntReply(int64(result)) } func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, exists := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{Data: value}) db.addAof(utils.ToCmdLine2("getset", args...)) ...... }
添加addAof方法
测试命令
*3
$3
SET
$3
key
$5
value
*2
$3
GET
$3
key
*2
$6
SELECT
$1
1