zoukankan      html  css  js  c++  java
  • leveldb 学习笔记

    leveldb

    使用

    // The returned DB instance is safe for concurrent use. Which mean that all
    // DB's methods may be called concurrently from multiple goroutine.
    db, err := leveldb.OpenFile("path/to/db", nil)
    ...
    defer db.Close()
    ...
    
    // Remember that the contents of the returned slice should not be modified.
    data, err := db.Get([]byte("key"), nil)
    ...
    err = db.Put([]byte("key"), []byte("value"), nil)
    ...
    err = db.Delete([]byte("key"), nil)
    ...
    

    源码分析

    
    db, err := leveldb.OpenFile("path/to/db", nil)
    
    func OpenFile(path string, o *opt.Options) (db *DB, err error) {
    	stor, err := storage.OpenFile(path, o.GetReadOnly()) //构建 storage
    	if err != nil {
    		return
    	}
    	db, err = Open(stor, o) //构建db,创建合并goruting
    	if err != nil {
    		stor.Close()
    	} else {
    		db.closer = stor
    	}
    	return
    }
    
    
    // OpenFile returns a new filesystem-backed storage implementation with the given
    // path. This also acquire a file lock, so any subsequent attempt to open the
    // same path will fail.
    //
    // The storage must be closed after use, by calling Close method.
    func OpenFile(path string, readOnly bool) (Storage, error) {
    	if fi, err := os.Stat(path); err == nil {
    		if !fi.IsDir() {
    			return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
    		}
    	} else if os.IsNotExist(err) && !readOnly {
    		if err := os.MkdirAll(path, 0755); err != nil {
    			return nil, err
    		}
    	} else {
    		return nil, err
    	}
    
    	flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly) // 创建 LOCK 文件
    	if err != nil {
    		return nil, err
    	}
    
    	defer func() {
    		if err != nil {
    			flock.release()
    		}
    	}()
    
    	var (
    		logw    *os.File
    		logSize int64
    	)
    	if !readOnly {
    		logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644) // 创建 LOG 文件
    		if err != nil {
    			return nil, err
    		}
    		logSize, err = logw.Seek(0, os.SEEK_END) // 可能追加
    		if err != nil {
    			logw.Close()
    			return nil, err
    		}
    	}
    
    	fs := &fileStorage{
    		path:     path,
    		readOnly: readOnly,
    		flock:    flock,
    		logw:     logw,
    		logSize:  logSize,
    	}
    	runtime.SetFinalizer(fs, (*fileStorage).Close) // 回收时释放
    	return fs, nil
    }
    
    
    func newFileLock(path string, readOnly bool) (fl fileLock, err error) {
    	pathp, err := syscall.UTF16PtrFromString(path)
    	if err != nil {
    		return
    	}
    	var access, shareMode uint32
    	if readOnly {
    		access = syscall.GENERIC_READ
    		shareMode = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE
    	} else {
    		access = syscall.GENERIC_READ | syscall.GENERIC_WRITE
    	}
    	fd, err := syscall.CreateFile(pathp, access, shareMode, nil, syscall.OPEN_EXISTING, syscall.FILE_ATTRIBUTE_NORMAL, 0)
    	if err == syscall.ERROR_FILE_NOT_FOUND {
    		fd, err = syscall.CreateFile(pathp, access, shareMode, nil, syscall.OPEN_ALWAYS, syscall.FILE_ATTRIBUTE_NORMAL, 0)
    	}
    	if err != nil {
    		return
    	}
    	fl = &windowsFileLock{fd: fd}
    	return
    }
    
    
    func openDB(s *session) (*DB, error) {
    	s.log("db@open opening")
    	start := time.Now()
    	db := &DB{
    		s: s,
    		// Initial sequence
    		seq: s.stSeqNum,
    		// MemDB
    		memPool: make(chan *memdb.DB, 1),
    		// Snapshot
    		snapsList: list.New(),
    		// Write
    		batchPool:    sync.Pool{New: newBatch},
    		writeMergeC:  make(chan writeMerge),
    		writeMergedC: make(chan bool),
    		writeLockC:   make(chan struct{}, 1),
    		writeAckC:    make(chan error),
    		// Compaction
    		tcompCmdC:   make(chan cCmd),
    		tcompPauseC: make(chan chan<- struct{}),
    		mcompCmdC:   make(chan cCmd),
    		compErrC:    make(chan error),
    		compPerErrC: make(chan error),
    		compErrSetC: make(chan error),
    		// Close
    		closeC: make(chan struct{}),
    	}
    
    	// Read-only mode.
    	readOnly := s.o.GetReadOnly()
    
    	if readOnly {
    		// Recover journals (read-only mode).
    		if err := db.recoverJournalRO(); err != nil {
    			return nil, err
    		}
    	} else {
    		// Recover journals.
    		if err := db.recoverJournal(); err != nil {
    			return nil, err
    		}
    
    		// Remove any obsolete files.
    		if err := db.checkAndCleanFiles(); err != nil {
    			// Close journal.
    			if db.journal != nil {
    				db.journal.Close()
    				db.journalWriter.Close()
    			}
    			return nil, err
    		}
    
    	}
    
    	// Doesn't need to be included in the wait group.
    	go db.compactionError()
    	go db.mpoolDrain()
    
    	if readOnly {
    		db.SetReadOnly()
    	} else {
    		db.closeW.Add(2)
    		go db.tCompaction()  //启动一个合并goruting,db put的时候进行tCompaction操作
    		go db.mCompaction() //启动一个合并goruting,db put的时候进行mCompaction操作
    		// go db.jWriter()
    	}
    
    	s.logf("db@open done T·%v", time.Since(start))
    
    	runtime.SetFinalizer(db, (*DB).Close)
    	return db, nil
    }
    
    // Put sets the value for the given key. It overwrites any previous value
    // for that key; a DB is not a multi-map. Write merge also applies for Put, see
    // Write.
    //
    // It is safe to modify the contents of the arguments after Put returns but not
    // before.
    func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
    	return db.putRec(keyTypeVal, key, value, wo)
    }
    
    
    // 添加接受
    func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
    	if err := db.ok(); err != nil { //判断数据库是否关闭
    		return err
    	}
    
    	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
    	sync := wo.GetSync() && !db.s.o.GetNoSync()
    
    	// Acquire write lock.
    	if merge {
    		select {
    		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
    			if <-db.writeMergedC {
    				// Write is merged.
    				return <-db.writeAckC
    			}
    			// Write is not merged, the write lock is handed to us. Continue.
    		case db.writeLockC <- struct{}{}:
    			// Write lock acquired.
    		case err := <-db.compPerErrC:
    			// Compaction error.
    			return err
    		case <-db.closeC:
    			// Closed
    			return ErrClosed
    		}
    	} else {
    		select {
    		case db.writeLockC <- struct{}{}:
    			// Write lock acquired.
    		case err := <-db.compPerErrC:
    			// Compaction error.
    			return err
    		case <-db.closeC:
    			// Closed
    			return ErrClosed
    		}
    	}
    
    	batch := db.batchPool.Get().(*Batch) //获取复用的batch
    	batch.Reset() // 重置
    	batch.appendRec(kt, key, value) // 添加到batch
    	return db.writeLocked(batch, batch, merge, sync) //锁写入
    }
    
    func (b *Batch) appendRec(kt keyType, key, value []byte) {
    	n := 1 + binary.MaxVarintLen32 + len(key)
    	if kt == keyTypeVal {
    		n += binary.MaxVarintLen32 + len(value)
    	}
    	b.grow(n) //增加batch data长度
    	index := batchIndex{keyType: kt}
    	o := len(b.data)
    	data := b.data[:o+n]
    	data[o] = byte(kt) // 类型
    	o++
    	o += binary.PutUvarint(data[o:], uint64(len(key))) //key 长度
    	index.keyPos = o
    	index.keyLen = len(key)
    	o += copy(data[o:], key) // key 值
    	if kt == keyTypeVal {
    		o += binary.PutUvarint(data[o:], uint64(len(value))) //value 长度
    		index.valuePos = o
    		index.valueLen = len(value)
    		o += copy(data[o:], value) // value 值
    	}
    	b.data = data[:o]
    	b.index = append(b.index, index)
    	b.internalLen += index.keyLen + index.valueLen + 8
    }
    
    func (b *Batch) grow(n int) {
    	o := len(b.data)
    	if cap(b.data)-o < n {
    		div := 1
    		if len(b.index) > batchGrowRec {
    			div = len(b.index) / batchGrowRec
    		}
    		ndata := make([]byte, o, o+n+o/div)
    		copy(ndata, b.data)
    		b.data = ndata
    	}
    }
    
    
    // ourBatch is batch that we can modify.
    func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
    	// Try to flush memdb. This method would also trying to throttle writes
    	// if it is too fast and compaction cannot catch-up.
    	mdb, mdbFree, err := db.flush(batch.internalLen) // 
    	if err != nil {
    		db.unlockWrite(false, 0, err)
    		return err
    	}
    	defer mdb.decref()
    
    	var (
    		overflow bool
    		merged   int
    		batches  = []*Batch{batch}
    	)
    
    	if merge {
    		// Merge limit.
    		var mergeLimit int
    		if batch.internalLen > 128<<10 {
    			mergeLimit = (1 << 20) - batch.internalLen
    		} else {
    			mergeLimit = 128 << 10
    		}
    		mergeCap := mdbFree - batch.internalLen
    		if mergeLimit > mergeCap {
    			mergeLimit = mergeCap
    		}
    
    	merge:
    		for mergeLimit > 0 {
    			select {
    			case incoming := <-db.writeMergeC:
    				if incoming.batch != nil {
    					// Merge batch.
    					if incoming.batch.internalLen > mergeLimit {
    						overflow = true
    						break merge
    					}
    					batches = append(batches, incoming.batch)
    					mergeLimit -= incoming.batch.internalLen
    				} else {
    					// Merge put.
    					internalLen := len(incoming.key) + len(incoming.value) + 8
    					if internalLen > mergeLimit {
    						overflow = true
    						break merge
    					}
    					if ourBatch == nil {
    						ourBatch = db.batchPool.Get().(*Batch)
    						ourBatch.Reset()
    						batches = append(batches, ourBatch)
    					}
    					// We can use same batch since concurrent write doesn't
    					// guarantee write order.
    					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
    					mergeLimit -= internalLen
    				}
    				sync = sync || incoming.sync
    				merged++
    				db.writeMergedC <- true
    
    			default:
    				break merge
    			}
    		}
    	}
    
    	// Release ourBatch if any.
    	if ourBatch != nil {
    		defer db.batchPool.Put(ourBatch)
    	}
    
    	// Seq number.
    	seq := db.seq + 1
    
    	// Write journal.
    	if err := db.writeJournal(batches, seq, sync); err != nil {
    		db.unlockWrite(overflow, merged, err)
    		return err
    	}
    
    	// Put batches.
    	for _, batch := range batches {
    		if err := batch.putMem(seq, mdb.DB); err != nil {
    			panic(err)
    		}
    		seq += uint64(batch.Len())
    	}
    
    	// Incr seq number.
    	db.addSeq(uint64(batchesLen(batches)))
    
    	// Rotate memdb if it's reach the threshold.
    	if batch.internalLen >= mdbFree {
    		db.rotateMem(0, false)
    	}
    
    	db.unlockWrite(overflow, merged, nil)
    	return nil
    }
    
    
    func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
    	delayed := false
    	slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
    	pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
    	flush := func() (retry bool) {
    		mdb = db.getEffectiveMem()
    		if mdb == nil {
    			err = ErrClosed
    			return false
    		}
    		defer func() {
    			if retry {
    				mdb.decref()
    				mdb = nil
    			}
    		}()
    		tLen := db.s.tLen(0)
    		mdbFree = mdb.Free()
    		switch {
    		case tLen >= slowdownTrigger && !delayed:
    			delayed = true
    			time.Sleep(time.Millisecond)
    		case mdbFree >= n:
    			return false
    		case tLen >= pauseTrigger:
    			delayed = true
    			// Set the write paused flag explicitly.
    			atomic.StoreInt32(&db.inWritePaused, 1)
    			err = db.compTriggerWait(db.tcompCmdC)
    			// Unset the write paused flag.
    			atomic.StoreInt32(&db.inWritePaused, 0)
    			if err != nil {
    				return false
    			}
    		default:
    			// Allow memdb to grow if it has no entry.
    			if mdb.Len() == 0 {
    				mdbFree = n
    			} else {
    				mdb.decref()
    				mdb, err = db.rotateMem(n, false)
    				if err == nil {
    					mdbFree = mdb.Free()
    				} else {
    					mdbFree = 0
    				}
    			}
    			return false
    		}
    		return true
    	}
    	start := time.Now()
    	for flush() {
    	}
    	if delayed {
    		db.writeDelay += time.Since(start)
    		db.writeDelayN++
    	} else if db.writeDelayN > 0 {
    		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
    		atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
    		atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
    		db.writeDelay = 0
    		db.writeDelayN = 0
    	}
    	return
    }
    
    
    func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
    	retryLimit := 3
    retry:
    	// Wait for pending memdb compaction.
    	err = db.compTriggerWait(db.mcompCmdC)
    	if err != nil {
    		return
    	}
    	retryLimit--
    
    	// Create new memdb and journal.
    	mem, err = db.newMem(n)
    	if err != nil {
    		if err == errHasFrozenMem {
    			if retryLimit <= 0 {
    				panic("BUG: still has frozen memdb")
    			}
    			goto retry
    		}
    		return
    	}
    
    	// Schedule memdb compaction.
    	if wait {
    		err = db.compTriggerWait(db.mcompCmdC)
    	} else {
    		db.compTrigger(db.mcompCmdC)
    	}
    	return
    }
    
    func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
    	wr, err := db.journal.Next()
    	if err != nil {
    		return err
    	}
    	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
    		return err
    	}
    	if err := db.journal.Flush(); err != nil {
    		return err
    	}
    	if sync {
    		return db.journalWriter.Sync()
    	}
    	return nil
    }
    
    func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
    	var ik []byte
    	for i, index := range b.index {
    		ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
    		if err := mdb.Put(ik, index.v(b.data)); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
    	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
    
    	if auxm != nil {
    		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
    			return append([]byte{}, mv...), me
    		}
    	}
    
    	em, fm := db.getMems()
    	for _, m := range [...]*memDB{em, fm} { // 读取memtable
    		if m == nil {
    			continue
    		}
    		defer m.decref()
    
    		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
    			return append([]byte{}, mv...), me
    		}
    	}
    
    	v := db.s.version()
    	value, cSched, err := v.get(auxt, ikey, ro, false) // 读取每一层table
    	v.release()
    	if cSched {
    		// Trigger table compaction.
    		db.compTrigger(db.tcompCmdC)
    	}
    	return
    }
    
    
    func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
    	if v.closing {
    		return nil, false, ErrClosed
    	}
    
    	ukey := ikey.ukey()
    	sampleSeeks := !v.s.o.GetDisableSeeksCompaction()
    
    	var (
    		tset  *tSet
    		tseek bool
    
    		// Level-0.
    		zfound bool
    		zseq   uint64
    		zkt    keyType
    		zval   []byte
    	)
    
    	err = ErrNotFound
    
    	// Since entries never hop across level, finding key/value
    	// in smaller level make later levels irrelevant.
    	v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool {
    		if sampleSeeks && level >= 0 && !tseek {
    			if tset == nil {
    				tset = &tSet{level, t}
    			} else {
    				tseek = true
    			}
    		}
    
    		var (
    			fikey, fval []byte
    			ferr        error
    		)
    		if noValue {
    			fikey, ferr = v.s.tops.findKey(t, ikey, ro)
    		} else {
    			fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
    		}
    
    		switch ferr {
    		case nil:
    		case ErrNotFound:
    			return true
    		default:
    			err = ferr
    			return false
    		}
    
    		if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil {
    			if v.s.icmp.uCompare(ukey, fukey) == 0 {
    				// Level <= 0 may overlaps each-other.
    				if level <= 0 {
    					if fseq >= zseq {
    						zfound = true
    						zseq = fseq
    						zkt = fkt
    						zval = fval
    					}
    				} else {
    					switch fkt {
    					case keyTypeVal:
    						value = fval
    						err = nil
    					case keyTypeDel:
    					default:
    						panic("leveldb: invalid internalKey type")
    					}
    					return false
    				}
    			}
    		} else {
    			err = fkerr
    			return false
    		}
    
    		return true
    	}, func(level int) bool {
    		if zfound {
    			switch zkt {
    			case keyTypeVal:
    				value = zval
    				err = nil
    			case keyTypeDel:
    			default:
    				panic("leveldb: invalid internalKey type")
    			}
    			return false
    		}
    
    		return true
    	})
    
    	if tseek && tset.table.consumeSeek() <= 0 {
    		tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
    	}
    
    	return
    }
    
  • 相关阅读:
    Kafka之消费者与消费者组
    Kafka之生产者
    基于Redis+Lua的分布式限流
    限流方案常用算法讲解
    分布式服务限流
    微服务框架服务调用与容错
    ZooKeeper实现服务注册中心
    微服务注册中心
    让我自己来整理
    Netty框架
  • 原文地址:https://www.cnblogs.com/SLchuck/p/14001912.html
Copyright © 2011-2022 走看看