zoukankan      html  css  js  c++  java
  • nsq源码-diskqueue

    有兴趣可以看看这篇文章
    https://www.cnblogs.com/zhangboyu/p/7457070.html

    一、队列存储

    队列的特征是先入先出,也就是写入是从后面写入,读取是从前面读取
    我们平时写的队列一般是放到内存里面,比如一个大的动态数组
    这里如果队列中的数据很大,diskqueue则是将这个动态数组拆成了好多个文件来存储队列中的数据

    如果队列是放在内存数组中,那么队列只需要记录两个属性,一个头的位置,一个是尾的位置,
    队列大小depth = 头位置 - 尾位置

    但是由于diskqueue是将数组保存在多个文件中
    所以diskqueue就会有五个属性: 头所在的文件,头在文件中的位置,尾所在的文件,尾在文件中的位置,还有就是depth标识头和尾中间的数据数量
    这五个数据作为diskqueue的元数据单独保存在一个文件里面。
    所以New一个diskqueue的时候先要这几个元数据读取出来

    func New(name string, dataPath string, maxBytesPerFile int64,
    	minMsgSize int32, maxMsgSize int32,
    	syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
    	d := diskQueue{
    		//名称
    		name:              name,
    		//文件保存路径
    		dataPath:          dataPath,
    		//每个文件大小最大值,超过要重新开启一个文件
    		maxBytesPerFile:   maxBytesPerFile,
    		//写入消息最小大小
    		minMsgSize:        minMsgSize,
    		//写入消息最大大小
    		maxMsgSize:        maxMsgSize,
    		readChan:          make(chan []byte),
    		writeChan:         make(chan []byte),
    		writeResponseChan: make(chan error),
    		emptyChan:         make(chan int),
    		emptyResponseChan: make(chan error),
    		exitChan:          make(chan int),
    		exitSyncChan:      make(chan int),
    		syncEvery:         syncEvery,
    		syncTimeout:       syncTimeout,
    		logf:              logf,
    	}
    
    	// 读取队列数据
    	err := d.retrieveMetaData()
    	if err != nil && !os.IsNotExist(err) {
    		d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
    	}
    
    	go d.ioLoop()
    	return &d
    }
    
    // 读取队列数据
    func (d *diskQueue) retrieveMetaData() error {
    	var f *os.File
    	var err error
    
    	fileName := d.metaDataFileName()
    	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    	if err != nil {
    		return err
    	}
    	defer f.Close()
    
    	//队列写入和读取位置中间有多少条数据,也就是队列的大小
    	var depth int64
    	//读取队列核心数据
    	//当前读取文件是哪个,读取位置是哪里
    	//当前写入的文件是哪个,写入文件位置
    	_, err = fmt.Fscanf(f, "%d
    %d,%d
    %d,%d
    ",
    		&depth,
    		&d.readFileNum, &d.readPos,
    		&d.writeFileNum, &d.writePos)
    	if err != nil {
    		return err
    	}
    	atomic.StoreInt64(&d.depth, depth)
    
    	//下一个读取文件
    	d.nextReadFileNum = d.readFileNum
    	//下一个读取位置
    	d.nextReadPos = d.readPos
    
    	return nil
    }
    

    二、写入队列

    func (d *diskQueue) writeOne(data []byte) error {
    	var err error
    
    	// 当前写入文件是否打开,没有则打开当前写入文件
    	if d.writeFile == nil {
    		curFileName := d.fileName(d.writeFileNum)
    		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
    		if err != nil {
    			return err
    		}
    
    		d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
    
    		//如果当前写入位置大于0,则将文件位置移动到写入位置点
    		if d.writePos > 0 {
    			_, err = d.writeFile.Seek(d.writePos, 0)
    			if err != nil {
    				d.writeFile.Close()
    				d.writeFile = nil
    				return err
    			}
    		}
    	}
    
    	dataLen := int32(len(data))
    
    	//判断消息大小是否合法
    	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
    		return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
    	}
    
    	//将缓冲区清空
    	d.writeBuf.Reset()
    	//将消息大小写入缓冲区
    	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    	if err != nil {
    		return err
    	}
    
    	//将消息写入缓冲区
    	_, err = d.writeBuf.Write(data)
    	if err != nil {
    		return err
    	}
    
    	// only write to the file once
    	//将缓冲区关联到文件
    	_, err = d.writeFile.Write(d.writeBuf.Bytes())
    	if err != nil {
    		d.writeFile.Close()
    		d.writeFile = nil
    		return err
    	}
    
    	//计算总大小
    	totalBytes := int64(4 + dataLen)
    	d.writePos += totalBytes
    	//队列消息数量+1
    	atomic.AddInt64(&d.depth, 1)
    
    	//如果写入位置大于了文件最大大小
    	if d.writePos >= d.maxBytesPerFile {
    		//将当前写入文件+1
    		d.writeFileNum++
    		//当前写入位置重置为0
    		d.writePos = 0
    
    		// sync every time we start writing to a new file
    		//将缓存数据写入到磁盘
    		err = d.sync()
    		if err != nil {
    			d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
    		}
    
    		if d.writeFile != nil {
    			d.writeFile.Close()
    			d.writeFile = nil
    		}
    	}
    
    	return err
    }
    

    读取队列

    func (d *diskQueue) readOne() ([]byte, error) {
    	var err error
    	var msgSize int32
    
    	// 当前读取文件是否打开,没有则打开当前文件
    	if d.readFile == nil {
    		// 打开读取的文件(也就是当前队列头所在的文件),如果文件大小到达上线maxBytesPerFile,readFileNum加一
    		curFileName := d.fileName(d.readFileNum)
    		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
    		if err != nil {
    			return nil, err
    		}
    
    		d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
    
    		//当前队列头在当前读取文件的位置
    		if d.readPos > 0 {
    			_, err = d.readFile.Seek(d.readPos, 0)
    			if err != nil {
    				d.readFile.Close()
    				d.readFile = nil
    				return nil, err
    			}
    		}
    
    		d.reader = bufio.NewReader(d.readFile)
    	}
    
    	// 先读取出消息的大小
    	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    	if err != nil {
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, err
    	}
    
    	//判断消息大小是否合法
    	if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
    		// this file is corrupt and we have no reasonable guarantee on
    		// where a new message should begin
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
    	}
    
    	//读取消息
    	readBuf := make([]byte, msgSize)
    	_, err = io.ReadFull(d.reader, readBuf)
    	if err != nil {
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, err
    	}
    
    	totalBytes := int64(4 + msgSize)
    
    	//将下一个要读取的位置往后移
    	d.nextReadPos = d.readPos + totalBytes
    	d.nextReadFileNum = d.readFileNum
    
    	//判断下一个读取的位置是不是超过了文件大小
    	if d.nextReadPos > d.maxBytesPerFile {
    		if d.readFile != nil {
    			d.readFile.Close()
    			d.readFile = nil
    		}
    		//如果超过了,则当前队列头文件要往后移,且读取位置设置为0
    		d.nextReadFileNum++
    		d.nextReadPos = 0
    	}
    
    	return readBuf, nil
    }
    
    // 刷新缓存到磁盘
    func (d *diskQueue) sync() error {
    	if d.writeFile != nil {
    		// 将缓冲区的数据从内存中拷贝刷新到硬盘中保存
    		err := d.writeFile.Sync()
    		if err != nil {
    			d.writeFile.Close()
    			d.writeFile = nil
    			return err
    		}
    	}
    
    	//保存元数据
    	err := d.persistMetaData()
    	if err != nil {
    		return err
    	}
    
    	d.needSync = false
    	return nil
    }
    

    五、ioLoop循环

    这个函数是一个“守护”协程,
    暴露的d.writeChan和d.readChan
    如果外部有网writeChan里写数据在这里处理
    同时,这里的消息也会通过d.readChan将消息不断的从队列中往外推

    func (d *diskQueue) ioLoop() {
    	var dataRead []byte
    	var err error
    	var count int64
    	var r chan []byte
    
    	//开启一个timer,syncTimeout在系统配置里,默认是2秒一次
    	syncTicker := time.NewTicker(d.syncTimeout)
    
    	for {
    		// SyncEvery是系统配置,默认值是2500
    		// 也就是如果现在这段代码中的count的值如果到了2500,则将缓存中的数据保存到磁盘
    		if count == d.syncEvery {
    			d.needSync = true
    		}
    
    		// needSync这个字段如果为true,则将缓存中的数据保存到磁盘
    		if d.needSync {
    			err = d.sync()
    			if err != nil {
    				d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
    			}
    			//同步缓存到磁盘成功,将count重置为0
    			count = 0
    		}
    
    		//如果队列头和尾中间还有数据,则从头部读取数据
    		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
    			if d.nextReadPos == d.readPos {
    				dataRead, err = d.readOne()
    				if err != nil {
    					d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
    						d.name, d.readPos, d.fileName(d.readFileNum), err)
    					d.handleReadError()
    					continue
    				}
    			}
    			//这里读取的数据放到readChan这个通道里
    			r = d.readChan
    		} else {
    			r = nil
    		}
    
    		select {
    		// the Go channel spec dictates that nil channel operations (read or write)
    		// in a select are skipped, we set r to d.readChan only when there is data to read
    		//看上面的英文注释,如果r为空,则这个分支会被跳过,这是golang的一个特性
    		//将读取的消息,丢到d.readChan里面,r.readChan向外部暴露
    		case r <- dataRead:
    			count++
    			// moveForward sets needSync flag if a file is removed
    			//moveForward()会删除已经没用的file,这个文件中的数据已经全部被读取了,不在队列头和尾之间了
    			d.moveForward()
    		case <-d.emptyChan:
    			d.emptyResponseChan <- d.deleteAllFiles()
    			count = 0
    		case dataWrite := <-d.writeChan:
    			//如果d.writeChan有写入数据,则将消息数据写入到队列
    			count++
    			d.writeResponseChan <- d.writeOne(dataWrite)
    		case <-syncTicker.C:
    			//这里相当于两秒钟同步一次缓存到磁盘
    			if count == 0 {
    				// avoid sync when there's no activity
    				continue
    			}
    			d.needSync = true
    		case <-d.exitChan:
    			goto exit
    		}
    	}
    
    exit:
    	d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
    	syncTicker.Stop()
    	d.exitSyncChan <- 1
    }
    
  • 相关阅读:
    AJAX异步传输——以php文件传输为例
    js控制json生成菜单——自制菜单(一)
    vs2010中关于HTML控件与服务器控件分别和js函数混合使用的问题
    SQL数据库连接到服务器出错——无法连接到XXX
    PHP错误:Namespace declaration statement has to be the very first statement in the script
    【LeetCode】19. Remove Nth Node From End of List
    【LeetCode】14. Longest Common Prefix
    【LeetCode】38. Count and Say
    【LeetCode】242. Valid Anagram
    【LeetCode】387. First Unique Character in a String
  • 原文地址:https://www.cnblogs.com/werben/p/14517781.html
Copyright © 2011-2022 走看看