本文主要介绍fio是如何运行的,并且以单线程、单job为例
fio的入口在fio.c中的main函数,下面列出了main函数,此处只出示了一些调用的关键函数
1 int main(int argc, char *argv[], char *envp[]) 2 { 3 parse_options(argc, argv); 4 fio_backend(); 5 }
在main函数中主要调用了两个关键函数,parse_options,顾名思义,就是分析options,也就是fio的参数,而fio_backend()函数则是fio进程的入口
fio_backend()函数在backend.c文件中
1 int fio_backend(void) 2 { 3 ...... 4 run_threads(); 5 ...... 6 }
在fio_backend()函数中,初始化一些数据结构之后,调用了run_threads()(backend.c)函数,该函数是fio用来创建譬如I/O, verify线程等。
1 /* 2 * Main function for kicking off and reaping jobs, as needed. 3 */ 4 static void run_threads(void) 5 { 6 ...... 7 todo = thread_number; 8 ...... 9 while (todo) { 10 if (td->o.use_thread) { 11 ...... 12 ret = pthread_create(&td->thread, NULL,thread_main, td); 13 ret = pthread_detach(td->thread); 14 ...... 15 } 16 ...... 17 }
在这个函数中,创建了thread_main线程(backend.c),这个线程功能是,生成I/O,发送并完成I/O,记录数据等。
1 /* 2 * Entry point for the thread based jobs. The process based jobs end up 3 * here as well, after a little setup. 4 */ 5 static void *thread_main(void *data) 6 { 7 ........ 8 /* 9 * May alter parameters that init_io_u() will use, so we need to 10 * do this first. 11 * 下面两个函数的主要功能是生成读写的参数,offset,len 12 */ 13 if (init_iolog(td)) 14 goto err; 15 16 if (init_io_u(td)) 17 goto err; 18 ...... 19 while (keep_running(td)) { 20 do_io(td); 21 do_verify(td, verify_bytes);//如果需要verification的话 22 } 23 }
如果不考虑verify的话,下面主要看do_io(backend.c)函数。
1 /* 2 * Main IO worker function. It retrieves io_u's to process and queues 3 * and reaps them, checking for rate and errors along the way. 4 * 5 * Returns number of bytes written and trimmed. 6 */ 7 static uint64_t do_io(struct thread_data *td) 8 { 9 ...... 10 //下面是do_io的主循环,判断条件是,io_log里有生成的pos信息,而且已经iuuse的数据小于总数据, 11 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 12 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || 13 td->o.time_based) { 14 ...... 15 io_u = get_io_u(td);// io_u,是一个io unit,是根据参数生成的io unit 16 ...... 17 ret = td_io_queue(td, io_u); //将io_u提交到队列中 18 switch (ret) { 19 case FIO_Q_COMPLETED: //处理错误,以及同步的操作 20 ...... 21 case FIO_Q_QUEUED://成功入队 22 bytes_issued += io_u->xfer_buflen; 23 case FIO_Q_BUSY: //队伍满了,重新入队 24 ....... 25 /* 26 * See if we need to complete some commands. Note that we 27 * can get BUSY even without IO queued, if the system is 28 * resource starved. 29 */ 30 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 31 if (full || !td->o.iodepth_batch_complete) { 32 min_evts = min(td->o.iodepth_batch_complete,td->cur_depth); 33 /* 34 * if the queue is full, we MUST reap at least 1 event 35 */ 36 if (full && !min_evts) 37 min_evts = 1; 38 do { 39 ret = io_u_queued_complete(td, min_evts, bytes_done); 40 } while (full && (td->cur_depth > td->o.iodepth_low)); 41 } 42 }
上面do_io函数的关键入队函数td_io_queue(ioengines.c)
1 int td_io_queue(struct thread_data *td, struct io_u *io_u) 2 { 3 ...... 4 ret = td->io_ops->queue(td, io_u); 5 ...... 6 else if (ret == FIO_Q_QUEUED) { 7 int r; 8 if (ddir_rw(io_u->ddir)) { 9 td->io_u_queued++; 10 td->ts.total_io_u[io_u->ddir]++; 11 } 12 if (td->io_u_queued >= td->o.iodepth_batch) { 13 r = td_io_commit(td); 14 if (r < 0) 15 return r; 16 } 17 } 18 } 19 int td_io_commit(struct thread_data *td) 20 { 21 ...... 22 int ret; 23 if (td->io_ops->commit) { 24 ret = td->io_ops->commit(td); 25 } 26 ...... 27 return 0; 28 }
对于td->iops,它是在各个engines中定义了,以libaio(/engines/libaio.c)为例,调用的函数就是相应engines里对应的函数
1 static struct ioengine_ops ioengine = { 2 .name = "libaio", 3 .version = FIO_IOOPS_VERSION, 4 .init = fio_libaio_init, 5 .prep = fio_libaio_prep, 6 .queue = fio_libaio_queue, 7 .commit = fio_libaio_commit, 8 .cancel = fio_libaio_cancel, 9 .getevents = fio_libaio_getevents, 10 .event = fio_libaio_event, 11 .cleanup = fio_libaio_cleanup, 12 .open_file = generic_open_file, 13 .close_file = generic_close_file, 14 .get_file_size = generic_get_file_size, 15 .options = options, 16 .option_struct_size = sizeof(struct libaio_options), 17 };
对于reap流程里的io_u_queued_complete(io_u.c)函数
1 /* 2 * Called to complete min_events number of io for the async engines. 3 */ 4 int io_u_queued_complete(struct thread_data *td, int min_evts, 5 uint64_t *bytes) 6 { 7 ...... 8 9 ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete, tvp); 10 ...... 11 } 12 13 int td_io_getevents(struct thread_data *td, unsigned int min, unsigned int max, 14 struct timespec *t) 15 { 16 int r = 0; 17 18 if (min > 0 && td->io_ops->commit) { 19 r = td->io_ops->commit(td); 20 if (r < 0) 21 goto out; 22 } 23 if (max > td->cur_depth) 24 max = td->cur_depth; 25 if (min > max) 26 max = min; 27 28 r = 0; 29 if (max && td->io_ops->getevents) 30 r = td->io_ops->getevents(td, min, max, t); 31 out: 32 ...... 33 return r; 34 }
这里调用的getevents也是各个engines里定义的函数