GO实现Redis的AOF持久化
将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据
https://github.com/csgopher/go-redis
本文涉及以下文件:redis.conf:配置文件
aof:实现aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilewww.devze.comname string currentDB int } func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RdwR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if 开发者_NewSQLerr !=php nil { logger.Warn(err) continue } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } } fun编程c (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer file.Close() ch := parser.ParseStream(file) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { if p.Err ==www.devze.com io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } }
- AofHandler:1.从管道中接收数据 2.写入AOF文件
- AddAof:http://www.devze.com用户的指令包装成payload放入管道
- handleAof:将管道中的payload写入磁盘
- LoadAof:重启Redis后加载aof文件
database/database.go
type Database struct { dbSet []*DB aofHandler *aof.AofHandler } func NewDatabase() *Database { mdb := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } mdb.dbSet = make([]*DB, config.Properties.Databases) for i := range mdb.dbSet { singleDB := makeDB() singleDB.index = i mdb.dbSet[i] = singleDB } if config.Properties.AppendOnly { aofHandler, err := aof.NewAOFHandler(mdb) if err != nil { panic(err) } mdb.aofHandler = aofHandler for _, db := range mdb.dbSet { singleDB := db singleDB.addAof = func(line CmdLine) { mdb.aofHandler.AddAof(singleDB.index, line) } } } return mdb }
将AOF加入到database里
使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db
database/db.go
type DB struct { index int data dict.Dict addAof func(CmdLine) } func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), addAof: func(line CmdLine) {}, } return db }
由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { ...... if deleted > 0 { db.addAof(utils.ToCmdLine2("del", args...)) } return reply.MakeIntReply(int64(deleted)) } func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() db.addAof(utils.ToCmdLine2("flushdb", args...)) return &reply.OkReply{} } func execRename(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("rename", args...)) return &reply.OkReply{} } func execRenameNx(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("renamenx", args...)) return reply.MakeIntReply(1) }
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("set", args...)) return &reply.OkReply{} } func execSetNX(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("setnx", args...)) return reply.MakeIntReply(int64(result)) } func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, exists := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{Data: value}) db.addAof(utils.ToCmdLine2("getset", args...)) ...... }
添加addAof方法
测试命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
到此这篇关于一文详解GO如何实现Redis的AOF持久化的文章就介绍到这了,更多相关GO实现Redis AOF持久化内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
精彩评论