zoukankan      html  css  js  c++  java
  • nsq源码-diskqueue

    有兴趣可以看看这篇文章
    https://www.cnblogs.com/zhangboyu/p/7457070.html

    一、队列存储

    队列的特征是先入先出,也就是写入是从后面写入,读取是从前面读取
    我们平时写的队列一般是放到内存里面,比如一个大的动态数组
    这里如果队列中的数据很大,diskqueue则是将这个动态数组拆成了好多个文件来存储队列中的数据

    如果队列是放在内存数组中,那么队列只需要记录两个属性,一个头的位置,一个是尾的位置,
    队列大小depth = 头位置 - 尾位置

    但是由于diskqueue是将数组保存在多个文件中
    所以diskqueue就会有五个属性: 头所在的文件,头在文件中的位置,尾所在的文件,尾在文件中的位置,还有就是depth标识头和尾中间的数据数量
    这五个数据作为diskqueue的元数据单独保存在一个文件里面。
    所以New一个diskqueue的时候先要这几个元数据读取出来

    func New(name string, dataPath string, maxBytesPerFile int64,
    	minMsgSize int32, maxMsgSize int32,
    	syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
    	d := diskQueue{
    		//名称
    		name:              name,
    		//文件保存路径
    		dataPath:          dataPath,
    		//每个文件大小最大值,超过要重新开启一个文件
    		maxBytesPerFile:   maxBytesPerFile,
    		//写入消息最小大小
    		minMsgSize:        minMsgSize,
    		//写入消息最大大小
    		maxMsgSize:        maxMsgSize,
    		readChan:          make(chan []byte),
    		writeChan:         make(chan []byte),
    		writeResponseChan: make(chan error),
    		emptyChan:         make(chan int),
    		emptyResponseChan: make(chan error),
    		exitChan:          make(chan int),
    		exitSyncChan:      make(chan int),
    		syncEvery:         syncEvery,
    		syncTimeout:       syncTimeout,
    		logf:              logf,
    	}
    
    	// 读取队列数据
    	err := d.retrieveMetaData()
    	if err != nil && !os.IsNotExist(err) {
    		d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
    	}
    
    	go d.ioLoop()
    	return &d
    }
    
    // 读取队列数据
    func (d *diskQueue) retrieveMetaData() error {
    	var f *os.File
    	var err error
    
    	fileName := d.metaDataFileName()
    	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    	if err != nil {
    		return err
    	}
    	defer f.Close()
    
    	//队列写入和读取位置中间有多少条数据,也就是队列的大小
    	var depth int64
    	//读取队列核心数据
    	//当前读取文件是哪个,读取位置是哪里
    	//当前写入的文件是哪个,写入文件位置
    	_, err = fmt.Fscanf(f, "%d
    %d,%d
    %d,%d
    ",
    		&depth,
    		&d.readFileNum, &d.readPos,
    		&d.writeFileNum, &d.writePos)
    	if err != nil {
    		return err
    	}
    	atomic.StoreInt64(&d.depth, depth)
    
    	//下一个读取文件
    	d.nextReadFileNum = d.readFileNum
    	//下一个读取位置
    	d.nextReadPos = d.readPos
    
    	return nil
    }
    

    二、写入队列

    func (d *diskQueue) writeOne(data []byte) error {
    	var err error
    
    	// 当前写入文件是否打开,没有则打开当前写入文件
    	if d.writeFile == nil {
    		curFileName := d.fileName(d.writeFileNum)
    		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
    		if err != nil {
    			return err
    		}
    
    		d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
    
    		//如果当前写入位置大于0,则将文件位置移动到写入位置点
    		if d.writePos > 0 {
    			_, err = d.writeFile.Seek(d.writePos, 0)
    			if err != nil {
    				d.writeFile.Close()
    				d.writeFile = nil
    				return err
    			}
    		}
    	}
    
    	dataLen := int32(len(data))
    
    	//判断消息大小是否合法
    	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
    		return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
    	}
    
    	//将缓冲区清空
    	d.writeBuf.Reset()
    	//将消息大小写入缓冲区
    	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    	if err != nil {
    		return err
    	}
    
    	//将消息写入缓冲区
    	_, err = d.writeBuf.Write(data)
    	if err != nil {
    		return err
    	}
    
    	// only write to the file once
    	//将缓冲区关联到文件
    	_, err = d.writeFile.Write(d.writeBuf.Bytes())
    	if err != nil {
    		d.writeFile.Close()
    		d.writeFile = nil
    		return err
    	}
    
    	//计算总大小
    	totalBytes := int64(4 + dataLen)
    	d.writePos += totalBytes
    	//队列消息数量+1
    	atomic.AddInt64(&d.depth, 1)
    
    	//如果写入位置大于了文件最大大小
    	if d.writePos >= d.maxBytesPerFile {
    		//将当前写入文件+1
    		d.writeFileNum++
    		//当前写入位置重置为0
    		d.writePos = 0
    
    		// sync every time we start writing to a new file
    		//将缓存数据写入到磁盘
    		err = d.sync()
    		if err != nil {
    			d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
    		}
    
    		if d.writeFile != nil {
    			d.writeFile.Close()
    			d.writeFile = nil
    		}
    	}
    
    	return err
    }
    

    读取队列

    func (d *diskQueue) readOne() ([]byte, error) {
    	var err error
    	var msgSize int32
    
    	// 当前读取文件是否打开,没有则打开当前文件
    	if d.readFile == nil {
    		// 打开读取的文件(也就是当前队列头所在的文件),如果文件大小到达上线maxBytesPerFile,readFileNum加一
    		curFileName := d.fileName(d.readFileNum)
    		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
    		if err != nil {
    			return nil, err
    		}
    
    		d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
    
    		//当前队列头在当前读取文件的位置
    		if d.readPos > 0 {
    			_, err = d.readFile.Seek(d.readPos, 0)
    			if err != nil {
    				d.readFile.Close()
    				d.readFile = nil
    				return nil, err
    			}
    		}
    
    		d.reader = bufio.NewReader(d.readFile)
    	}
    
    	// 先读取出消息的大小
    	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    	if err != nil {
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, err
    	}
    
    	//判断消息大小是否合法
    	if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
    		// this file is corrupt and we have no reasonable guarantee on
    		// where a new message should begin
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
    	}
    
    	//读取消息
    	readBuf := make([]byte, msgSize)
    	_, err = io.ReadFull(d.reader, readBuf)
    	if err != nil {
    		d.readFile.Close()
    		d.readFile = nil
    		return nil, err
    	}
    
    	totalBytes := int64(4 + msgSize)
    
    	//将下一个要读取的位置往后移
    	d.nextReadPos = d.readPos + totalBytes
    	d.nextReadFileNum = d.readFileNum
    
    	//判断下一个读取的位置是不是超过了文件大小
    	if d.nextReadPos > d.maxBytesPerFile {
    		if d.readFile != nil {
    			d.readFile.Close()
    			d.readFile = nil
    		}
    		//如果超过了,则当前队列头文件要往后移,且读取位置设置为0
    		d.nextReadFileNum++
    		d.nextReadPos = 0
    	}
    
    	return readBuf, nil
    }
    
    // 刷新缓存到磁盘
    func (d *diskQueue) sync() error {
    	if d.writeFile != nil {
    		// 将缓冲区的数据从内存中拷贝刷新到硬盘中保存
    		err := d.writeFile.Sync()
    		if err != nil {
    			d.writeFile.Close()
    			d.writeFile = nil
    			return err
    		}
    	}
    
    	//保存元数据
    	err := d.persistMetaData()
    	if err != nil {
    		return err
    	}
    
    	d.needSync = false
    	return nil
    }
    

    五、ioLoop循环

    这个函数是一个“守护”协程,
    暴露的d.writeChan和d.readChan
    如果外部有网writeChan里写数据在这里处理
    同时,这里的消息也会通过d.readChan将消息不断的从队列中往外推

    func (d *diskQueue) ioLoop() {
    	var dataRead []byte
    	var err error
    	var count int64
    	var r chan []byte
    
    	//开启一个timer,syncTimeout在系统配置里,默认是2秒一次
    	syncTicker := time.NewTicker(d.syncTimeout)
    
    	for {
    		// SyncEvery是系统配置,默认值是2500
    		// 也就是如果现在这段代码中的count的值如果到了2500,则将缓存中的数据保存到磁盘
    		if count == d.syncEvery {
    			d.needSync = true
    		}
    
    		// needSync这个字段如果为true,则将缓存中的数据保存到磁盘
    		if d.needSync {
    			err = d.sync()
    			if err != nil {
    				d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
    			}
    			//同步缓存到磁盘成功,将count重置为0
    			count = 0
    		}
    
    		//如果队列头和尾中间还有数据,则从头部读取数据
    		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
    			if d.nextReadPos == d.readPos {
    				dataRead, err = d.readOne()
    				if err != nil {
    					d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
    						d.name, d.readPos, d.fileName(d.readFileNum), err)
    					d.handleReadError()
    					continue
    				}
    			}
    			//这里读取的数据放到readChan这个通道里
    			r = d.readChan
    		} else {
    			r = nil
    		}
    
    		select {
    		// the Go channel spec dictates that nil channel operations (read or write)
    		// in a select are skipped, we set r to d.readChan only when there is data to read
    		//看上面的英文注释,如果r为空,则这个分支会被跳过,这是golang的一个特性
    		//将读取的消息,丢到d.readChan里面,r.readChan向外部暴露
    		case r <- dataRead:
    			count++
    			// moveForward sets needSync flag if a file is removed
    			//moveForward()会删除已经没用的file,这个文件中的数据已经全部被读取了,不在队列头和尾之间了
    			d.moveForward()
    		case <-d.emptyChan:
    			d.emptyResponseChan <- d.deleteAllFiles()
    			count = 0
    		case dataWrite := <-d.writeChan:
    			//如果d.writeChan有写入数据,则将消息数据写入到队列
    			count++
    			d.writeResponseChan <- d.writeOne(dataWrite)
    		case <-syncTicker.C:
    			//这里相当于两秒钟同步一次缓存到磁盘
    			if count == 0 {
    				// avoid sync when there's no activity
    				continue
    			}
    			d.needSync = true
    		case <-d.exitChan:
    			goto exit
    		}
    	}
    
    exit:
    	d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
    	syncTicker.Stop()
    	d.exitSyncChan <- 1
    }
    
  • 相关阅读:
    代码优化
    使用python的Flask实现一个RESTful API服务器端
    数据结构与算法之排序
    Ubuntu 13.04/12.10安装Oracle 11gR2图文教程(转)
    Linux 下开启ssh服务(转)
    PLSQL Developer 9.如何设置查询返回所有纪录(转)
    linux下安装oracle11g 64位最简客户端(转)
    Linux下关于解决JavaSwing中文乱码的情况(转)
    servlet(jsp)中的重定向和转发
    在用TabbarController中出现navigationController 嵌套报错
  • 原文地址:https://www.cnblogs.com/werben/p/14517781.html
Copyright © 2011-2022 走看看