leveldb
使用
// The returned DB instance is safe for concurrent use. Which mean that all
// DB's methods may be called concurrently from multiple goroutine.
db, err := leveldb.OpenFile("path/to/db", nil)
...
defer db.Close()
...
// Remember that the contents of the returned slice should not be modified.
data, err := db.Get([]byte("key"), nil)
...
err = db.Put([]byte("key"), []byte("value"), nil)
...
err = db.Delete([]byte("key"), nil)
...
源码分析
db, err := leveldb.OpenFile("path/to/db", nil)
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, o.GetReadOnly()) //构建 storage
if err != nil {
return
}
db, err = Open(stor, o) //构建db,创建合并goruting
if err != nil {
stor.Close()
} else {
db.closer = stor
}
return
}
// OpenFile returns a new filesystem-backed storage implementation with the given
// path. This also acquire a file lock, so any subsequent attempt to open the
// same path will fail.
//
// The storage must be closed after use, by calling Close method.
func OpenFile(path string, readOnly bool) (Storage, error) {
if fi, err := os.Stat(path); err == nil {
if !fi.IsDir() {
return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
}
} else if os.IsNotExist(err) && !readOnly {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
} else {
return nil, err
}
flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly) // 创建 LOCK 文件
if err != nil {
return nil, err
}
defer func() {
if err != nil {
flock.release()
}
}()
var (
logw *os.File
logSize int64
)
if !readOnly {
logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644) // 创建 LOG 文件
if err != nil {
return nil, err
}
logSize, err = logw.Seek(0, os.SEEK_END) // 可能追加
if err != nil {
logw.Close()
return nil, err
}
}
fs := &fileStorage{
path: path,
readOnly: readOnly,
flock: flock,
logw: logw,
logSize: logSize,
}
runtime.SetFinalizer(fs, (*fileStorage).Close) // 回收时释放
return fs, nil
}
func newFileLock(path string, readOnly bool) (fl fileLock, err error) {
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return
}
var access, shareMode uint32
if readOnly {
access = syscall.GENERIC_READ
shareMode = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE
} else {
access = syscall.GENERIC_READ | syscall.GENERIC_WRITE
}
fd, err := syscall.CreateFile(pathp, access, shareMode, nil, syscall.OPEN_EXISTING, syscall.FILE_ATTRIBUTE_NORMAL, 0)
if err == syscall.ERROR_FILE_NOT_FOUND {
fd, err = syscall.CreateFile(pathp, access, shareMode, nil, syscall.OPEN_ALWAYS, syscall.FILE_ATTRIBUTE_NORMAL, 0)
}
if err != nil {
return
}
fl = &windowsFileLock{fd: fd}
return
}
func openDB(s *session) (*DB, error) {
s.log("db@open opening")
start := time.Now()
db := &DB{
s: s,
// Initial sequence
seq: s.stSeqNum,
// MemDB
memPool: make(chan *memdb.DB, 1),
// Snapshot
snapsList: list.New(),
// Write
batchPool: sync.Pool{New: newBatch},
writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
mcompCmdC: make(chan cCmd),
compErrC: make(chan error),
compPerErrC: make(chan error),
compErrSetC: make(chan error),
// Close
closeC: make(chan struct{}),
}
// Read-only mode.
readOnly := s.o.GetReadOnly()
if readOnly {
// Recover journals (read-only mode).
if err := db.recoverJournalRO(); err != nil {
return nil, err
}
} else {
// Recover journals.
if err := db.recoverJournal(); err != nil {
return nil, err
}
// Remove any obsolete files.
if err := db.checkAndCleanFiles(); err != nil {
// Close journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
}
return nil, err
}
}
// Doesn't need to be included in the wait group.
go db.compactionError()
go db.mpoolDrain()
if readOnly {
db.SetReadOnly()
} else {
db.closeW.Add(2)
go db.tCompaction() //启动一个合并goruting,db put的时候进行tCompaction操作
go db.mCompaction() //启动一个合并goruting,db put的时候进行mCompaction操作
// go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start))
runtime.SetFinalizer(db, (*DB).Close)
return db, nil
}
// Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map. Write merge also applies for Put, see
// Write.
//
// It is safe to modify the contents of the arguments after Put returns but not
// before.
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.putRec(keyTypeVal, key, value, wo)
}
// 添加接受
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
if err := db.ok(); err != nil { //判断数据库是否关闭
return err
}
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
sync := wo.GetSync() && !db.s.o.GetNoSync()
// Acquire write lock.
if merge {
select {
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
if <-db.writeMergedC {
// Write is merged.
return <-db.writeAckC
}
// Write is not merged, the write lock is handed to us. Continue.
case db.writeLockC <- struct{}{}:
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
} else {
select {
case db.writeLockC <- struct{}{}:
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
}
batch := db.batchPool.Get().(*Batch) //获取复用的batch
batch.Reset() // 重置
batch.appendRec(kt, key, value) // 添加到batch
return db.writeLocked(batch, batch, merge, sync) //锁写入
}
func (b *Batch) appendRec(kt keyType, key, value []byte) {
n := 1 + binary.MaxVarintLen32 + len(key)
if kt == keyTypeVal {
n += binary.MaxVarintLen32 + len(value)
}
b.grow(n) //增加batch data长度
index := batchIndex{keyType: kt}
o := len(b.data)
data := b.data[:o+n]
data[o] = byte(kt) // 类型
o++
o += binary.PutUvarint(data[o:], uint64(len(key))) //key 长度
index.keyPos = o
index.keyLen = len(key)
o += copy(data[o:], key) // key 值
if kt == keyTypeVal {
o += binary.PutUvarint(data[o:], uint64(len(value))) //value 长度
index.valuePos = o
index.valueLen = len(value)
o += copy(data[o:], value) // value 值
}
b.data = data[:o]
b.index = append(b.index, index)
b.internalLen += index.keyLen + index.valueLen + 8
}
func (b *Batch) grow(n int) {
o := len(b.data)
if cap(b.data)-o < n {
div := 1
if len(b.index) > batchGrowRec {
div = len(b.index) / batchGrowRec
}
ndata := make([]byte, o, o+n+o/div)
copy(ndata, b.data)
b.data = ndata
}
}
// ourBatch is batch that we can modify.
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// Try to flush memdb. This method would also trying to throttle writes
// if it is too fast and compaction cannot catch-up.
mdb, mdbFree, err := db.flush(batch.internalLen) //
if err != nil {
db.unlockWrite(false, 0, err)
return err
}
defer mdb.decref()
var (
overflow bool
merged int
batches = []*Batch{batch}
)
if merge {
// Merge limit.
var mergeLimit int
if batch.internalLen > 128<<10 {
mergeLimit = (1 << 20) - batch.internalLen
} else {
mergeLimit = 128 << 10
}
mergeCap := mdbFree - batch.internalLen
if mergeLimit > mergeCap {
mergeLimit = mergeCap
}
merge:
for mergeLimit > 0 {
select {
case incoming := <-db.writeMergeC:
if incoming.batch != nil {
// Merge batch.
if incoming.batch.internalLen > mergeLimit {
overflow = true
break merge
}
batches = append(batches, incoming.batch)
mergeLimit -= incoming.batch.internalLen
} else {
// Merge put.
internalLen := len(incoming.key) + len(incoming.value) + 8
if internalLen > mergeLimit {
overflow = true
break merge
}
if ourBatch == nil {
ourBatch = db.batchPool.Get().(*Batch)
ourBatch.Reset()
batches = append(batches, ourBatch)
}
// We can use same batch since concurrent write doesn't
// guarantee write order.
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
mergeLimit -= internalLen
}
sync = sync || incoming.sync
merged++
db.writeMergedC <- true
default:
break merge
}
}
}
// Release ourBatch if any.
if ourBatch != nil {
defer db.batchPool.Put(ourBatch)
}
// Seq number.
seq := db.seq + 1
// Write journal.
if err := db.writeJournal(batches, seq, sync); err != nil {
db.unlockWrite(overflow, merged, err)
return err
}
// Put batches.
for _, batch := range batches {
if err := batch.putMem(seq, mdb.DB); err != nil {
panic(err)
}
seq += uint64(batch.Len())
}
// Incr seq number.
db.addSeq(uint64(batchesLen(batches)))
// Rotate memdb if it's reach the threshold.
if batch.internalLen >= mdbFree {
db.rotateMem(0, false)
}
db.unlockWrite(overflow, merged, nil)
return nil
}
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
flush := func() (retry bool) {
mdb = db.getEffectiveMem()
if mdb == nil {
err = ErrClosed
return false
}
defer func() {
if retry {
mdb.decref()
mdb = nil
}
}()
tLen := db.s.tLen(0)
mdbFree = mdb.Free()
switch {
case tLen >= slowdownTrigger && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case mdbFree >= n:
return false
case tLen >= pauseTrigger:
delayed = true
// Set the write paused flag explicitly.
atomic.StoreInt32(&db.inWritePaused, 1)
err = db.compTriggerWait(db.tcompCmdC)
// Unset the write paused flag.
atomic.StoreInt32(&db.inWritePaused, 0)
if err != nil {
return false
}
default:
// Allow memdb to grow if it has no entry.
if mdb.Len() == 0 {
mdbFree = n
} else {
mdb.decref()
mdb, err = db.rotateMem(n, false)
if err == nil {
mdbFree = mdb.Free()
} else {
mdbFree = 0
}
}
return false
}
return true
}
start := time.Now()
for flush() {
}
if delayed {
db.writeDelay += time.Since(start)
db.writeDelayN++
} else if db.writeDelayN > 0 {
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
db.writeDelay = 0
db.writeDelayN = 0
}
return
}
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
retryLimit := 3
retry:
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
if err != nil {
return
}
retryLimit--
// Create new memdb and journal.
mem, err = db.newMem(n)
if err != nil {
if err == errHasFrozenMem {
if retryLimit <= 0 {
panic("BUG: still has frozen memdb")
}
goto retry
}
return
}
// Schedule memdb compaction.
if wait {
err = db.compTriggerWait(db.mcompCmdC)
} else {
db.compTrigger(db.mcompCmdC)
}
return
}
func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
wr, err := db.journal.Next()
if err != nil {
return err
}
if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
return err
}
if err := db.journal.Flush(); err != nil {
return err
}
if sync {
return db.journalWriter.Sync()
}
return nil
}
func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
var ik []byte
for i, index := range b.index {
ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
if err := mdb.Put(ik, index.v(b.data)); err != nil {
return err
}
}
return nil
}
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
if auxm != nil {
if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
return append([]byte{}, mv...), me
}
}
em, fm := db.getMems()
for _, m := range [...]*memDB{em, fm} { // 读取memtable
if m == nil {
continue
}
defer m.decref()
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
return append([]byte{}, mv...), me
}
}
v := db.s.version()
value, cSched, err := v.get(auxt, ikey, ro, false) // 读取每一层table
v.release()
if cSched {
// Trigger table compaction.
db.compTrigger(db.tcompCmdC)
}
return
}
func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
if v.closing {
return nil, false, ErrClosed
}
ukey := ikey.ukey()
sampleSeeks := !v.s.o.GetDisableSeeksCompaction()
var (
tset *tSet
tseek bool
// Level-0.
zfound bool
zseq uint64
zkt keyType
zval []byte
)
err = ErrNotFound
// Since entries never hop across level, finding key/value
// in smaller level make later levels irrelevant.
v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
if sampleSeeks && level >= 0 && !tseek {
if tset == nil {
tset = &tSet{level, t}
} else {
tseek = true
}
}
var (
fikey, fval []byte
ferr error
)
if noValue {
fikey, ferr = v.s.tops.findKey(t, ikey, ro)
} else {
fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
}
switch ferr {
case nil:
case ErrNotFound:
return true
default:
err = ferr
return false
}
if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
if v.s.icmp.uCompare(ukey, fukey) == 0 {
// Level <= 0 may overlaps each-other.
if level <= 0 {
if fseq >= zseq {
zfound = true
zseq = fseq
zkt = fkt
zval = fval
}
} else {
switch fkt {
case keyTypeVal:
value = fval
err = nil
case keyTypeDel:
default:
panic("leveldb: invalid internalKey type")
}
return false
}
}
} else {
err = fkerr
return false
}
return true
}, func(level int) bool {
if zfound {
switch zkt {
case keyTypeVal:
value = zval
err = nil
case keyTypeDel:
default:
panic("leveldb: invalid internalKey type")
}
return false
}
return true
})
if tseek && tset.table.consumeSeek() <= 0 {
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
return
}