I find a paper about GNURADIO CORE and think it is a good reference. I will use Chinese to introduce it.
我会用中文主要介绍一下这个GNURADIO CORE WORK主要讲什么东西。
Contents
1 How the GNU Radio scheduler is
called and what it does
1
2 How a thread of each block works
5
内容
1. GNU Radio 调度器怎样调用 和
怎么工作
2.
每个模块的线程怎么工作
1 How the GNU Radio scheduler is called and what it
does
As we become quite familiar with GNU Radio Python codes now,
it is essential to figure out
what's going on behind the plain Python codes; that is, how
GNU radio core works. A typical
example of GNU Radio 3 is as follows. (There is an old
dial-tone example using flow_graph(),
which is obsolete! )
上面的话没太多意思,就是让我们从dial-tone开始去探索GNU
Radio怎么工作
Besides some routine connections for blocks, the GNU Radio
running thread starts at
my_top_block().run()
We are going to figure out what's going on after this
code. Since Python and C++ classes are 1-1
corresponding via SWIG, We have to go deeper and
to find the run() function in C++
class gr_top_block.
这里告诉你,PYTHON和C++通过SWIG对应起来,我们要了解 python里
run()这个函数,就要深入C++去了解。In
file: gr top block.cc
void
gr_top_block::run()
{
start();
wait();
}
Then, go to start();
先找到run(),看看代码,发现start(),去看start()
In file: gr
top block.cc
void
gr_top_block::start()
{
d_impl->start();
}
d_impl is a member that
points to the class gr_top_block_impl with the following
codes:
看到start() 其实是调用了d_impl->start(),去class
gr_top_block_impl 再去看
In file: gr
top block impl.cc
void
gr_top_block_impl::start()
{
gruel::scoped_lock
l(d_mutex);
if (d_state != IDLE)
throw
std::runtime_error("top_block::start: top block already running or
wait() not called after previous stop()");
if (d_lock_count
> 0)
throw
std::runtime_error("top_block::start: can't start with flow graph
locked");
// Create new flat flow
graph by flattening hierarchy
d_ffg =
d_owner->flatten();
// Validate new simple
flow graph and wire it up
d_ffg->validate();
d_ffg->setup_connections();
d_scheduler =
make_scheduler(d_ffg);
d_state = RUNNING;
}
The codes do some sanity
check and then create the GNU Radio Scheduler by
calling
d_scheduler =
make_scheduler(d_ffg);
Let go to function make_scheduler()
as follows
上面的代码通过调用 d_scheduler = make_scheduler(d_ffg);
很严谨的检查并且创建了GNU
Radio的调度器。让我们去make_scheduler(d_ffg)看看
In file : gr top block
impl.cc
static gr_scheduler_sptr
make_scheduler(gr_flat_flowgraph_sptr ffg)
{
static scheduler_maker
factory = 0;
if (factory == 0){
char *v
= getenv("GR_SCHEDULER");
if
(!v)
factory = scheduler_table[0].f; // use
default
else
{
for (size_t i = 0; i <
sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
if (strcmp(v, scheduler_table[i].name) == 0){
factory = scheduler_table[i].f;
break;
}
}
if (factory == 0){
std::cerr << "warning: Invalid
GR_SCHEDULER environment variable value \""
<< v
<< "\". Using \""
<< scheduler_table[0].name
<< "\"\n";
factory = scheduler_table[0].f;
}
}
}
return factory(ffg);
}
In the above program, what
is the variable static scheduler_maker
factory
we can look into the file and find,
在上面的程序中,scheduler_maker factory
是什么玩意?我们在文件中找找
typedef gr_scheduler_sptr
(*scheduler_maker) (gr_flat_flowgraph_sptr ffg);
Well, factory is a function pointer!
Where does it points to? We can find
factory
是一个函数指针,指向哪里?找找
factory =
schedulertable[i].f
OK. Let us find what is in
scheduler_table:
好了,让我们继续找找什么是scheduler_table
static struct scheduler_table {
const char
*name;
scheduler_maker f;
} scheduler_table[] = {
{ "TPB",
gr_scheduler_tpb::make }, // first entry is default
{ "STS",
gr_scheduler_sts::make }
};
Great! It points to a member
function, make, in a scheduler's class. Then the
program
is very
easy to understand: it checks whether there exists a Linux
environment variable:
GR_SCHEDULER. If no, use the
default scheduler; otherwise, use user's choice. And we do
not
have so
many choices on the scheduler:
好耶!它指向一个在scheduler类里边的成员函数,make。简单理解:检查是否存在Linux
环境变量:GR_SCHEDULER。如果没有,就是用默认的调度器;否则,是用用户选择的。其实我们也没多少选择不是么,就下边两个:
1. TPB (default): multi-threaded
scheduler.
2. STS: single-threaded
scheduler.
By default, gr_scheduler_tpb::make will be
called.
In file: gr scheduler_tpb.cc
gr_scheduler_sptr
gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
{
return
gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
}
The constructor of
gr_scheduler_tpb is called.
In file: gr
scheduler tpb.cc
gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr
ffg)
: gr_scheduler(ffg)
{
// Get a topologically
sorted vector of all the blocks in use.
// Being topologically
sorted probably isn't going to matter, but
// there's a non-zero
chance it might help...
gr_basic_block_vector_t
used_blocks = ffg->calc_used_blocks();
used_blocks =
ffg->topological_sort(used_blocks);
gr_block_vector_t blocks =
gr_flat_flowgraph::make_block_vector(used_blocks);
// Ensure that the done
flag is clear on all blocks
for (size_t i = 0; i
< blocks.size(); i++){
blocks[i]->detail()->set_done(false);
}
// Fire off a thead for
each block
for (size_t i = 0; i
< blocks.size(); i++){
std::stringstream name;
name
<< "thread-per-block["
<< i <<
"]: " << blocks[i];
d_threads.create_thread(
gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]),
name.str()));
}
}
Nothing strange here, the
only thing needs to mention is
d_threads.create_thread(
gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]),
name.str()));
thread_body_wrapper wraps
the main thread with the block name. Then, the
thread
begins from thread_body_wrapper(). Let us
see part of the program to find what happens.
thread_body_wrapper
把每个block的主要线程都包住了。然后,线程从thread_body_wrapper()开始。让我们看看到底这部分程序发生了什么。
In file: thread_body_wrapper.h
namespace
gruel
{
void
mask_signals();
template
<class F>
class
thread_body_wrapper
{
F d_f;
std::string d_name;
public:
explicit thread_body_wrapper(F f, const
std::string &name="")
: d_f(f), d_name(name)
{}
void operator()()
{
mask_signals();
try {
d_f();
}
catch(boost::thread_interrupted const
&)
{
}
catch(std::exception const
&e)
{
std::cerr
<< "thread["
<< d_name
<< "]: "
<<
e.what() << std::endl;
}
catch(...)
{
std::cerr
<< "thread["
<< d_name
<< "]: "
<<
"caught unrecognized exception\n";
}
}
};
}
See the overloading of
operate(), actually, d_f() is called, which is explicitly linked
to
tpb_container(). So go to the code of tpb_container:
看到重载operate(),实际上调用了d_f(),这个函数清晰的链接到tpb_container()。
In file:
gr_scheduler_tpb.cc
class tpb_container
{
gr_block_sptr
d_block;
public:
tpb_container(gr_block_sptr
block) : d_block(block) {}
void operator()()
{
gr_tpb_thread_body body(d_block);
}
};
Well. the overloading of
operate() just constructs another class,
gr_tpb_thread_body,
with the
block pointer. From here, the scheduler's work is
done.
Let's briefly summarize what
is going on with the GNU Radio scheduler.
1. Analyze
used blocks in gr_top_block.
2. Default
scheduler is TPB, which creates multi-threads for
blocks.
3. The
scheduler creates one concurrent thread for each
block.
4. For each
block, the thread's entry is gr_tpb_thread_body
body(d_block).
总结GNU RADIO
调度器作用:
1. 分析在 gr_top_block中每个block
1. 分析在 gr_top_block中每个block
2.默认调度器是TPB,用来创造每个blocks里边的多线程
3.调度器为每个block创建线程
4.对于每个block,
线程入口都在gr_tpb_thread_body 里边
2 How a thread of each block
works
As we discussed, the TPB scheduler generates
a thread for each block whose entry starts from
the
constructor of class gr_tpb_thread_body. Let us go over the
constructor:
正如我们讨论的,TPB调度器为每个block产生一个线程,这些线程的入口来自一个gr_tpb_thread_body类的构造器。
In file:
gr_tpb_thread_body.cc
gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr
block)
:
d_exec(block)
{
// std::cerr
<< "gr_tpb_thread_body: "
<< block
<< std::endl;
gr_block_detail *d =
block->detail().get();
gr_block_executor::state
s;
pmt_t msg;
//Here starts
the main loop of the thread.
//开始线程的循环
while (1){
//First, the
thread processes all signals
//首先,线程处理所有信号
boost::this_thread::interruption_point();
// handle any queued up messages
while ((msg =
d->d_tpb.delete_head_nowait()))
block->handle_msg(msg);
d->d_tpb.clear_changed();
switch(s){
case gr_block_executor::READY: // Tell neighbors
we made progress.
d->d_tpb.notify_neighbors(d);
break;
case gr_block_executor::READY_NO_OUTPUT: //
Notify upstream only
d->d_tpb.notify_upstream(d);
break;
case gr_block_executor::DONE: // Game over.
d->d_tpb.notify_neighbors(d);
return;
case gr_block_executor::BLKD_IN: // Wait for
input.
{
gruel::scoped_lock
guard(d->d_tpb.mutex);
while
(!d->d_tpb.input_changed){
// wait for input
or message
while(!d->d_tpb.input_changed
&&
d->d_tpb.empty_p())
d->d_tpb.input_cond.wait(guard);
// handle all
pending messages
while ((msg =
d->d_tpb.delete_head_nowait_already_holding_mutex())){
guard.unlock(); // release lock while processing
msg
block->handle_msg(msg);
guard.lock();
}
}
}
break;
case gr_block_executor::BLKD_OUT: // Wait for
output buffer space.
{
gruel::scoped_lock
guard(d->d_tpb.mutex);
while
(!d->d_tpb.output_changed){
// wait for output
room or message
while(!d->d_tpb.output_changed
&&
d->d_tpb.empty_p())
d->d_tpb.output_cond.wait(guard);
// handle all
pending messages
while ((msg =
d->d_tpb.delete_head_nowait_already_holding_mutex())){
guard.unlock(); // release lock while processing
msg
block->handle_msg(msg);
guard.lock();
}
}
}
break;
default:
assert(0);
}
}
}
So far so good. We can see
run_one_iteration() is the key in the whole thread,
it
includes the major functionality of the
block. Go to its source. Woo! it is a little
long.
很好,我们看到run_one_iteration(),它是整个线程的核心,它包括block的主要功能。去看看那些源代码吧!哇!!有一点点长噢。
In file:
gr_block_executor.cc
gr_block_executor::state
gr_block_executor::run_one_iteration()
{
...
// Do the actual work of the block int n =
m->general_work (noutput_items, d_ninput_items,
d_input_items, d_output_items); LOG(*d_log
<< " general_work: noutput_items = "
<< noutput_items
<< " result = "
<< n <<
std::endl);
...
}
Overall,
the code first checks
1. whether there
exists
sufficient
data
space for output. No → return
BLKD_OUT
2. whether there
are
sufficient
input data available,
No →
return
BLKD_IN
总体上,这些代码首先检查:
1.
是否存在足够的输出数据空间
2.
是否存在足够的可靠输入数据
If there
are sufficient input data and sufficient output space, the code
runs the actual work
of the block: general_work().
So up to now, we can know how each thread in
the GNU radio core works. Let us briefly
如果这里有足够的输入数据和足够的输出空间,代码就会执行block里边的work咯:
gnerral_work()
现在,我们可以知道在GNU
Radio里的每个线程都怎么工作了噢,所以短暂的总结一下:
summarize
1. A thread for each block has a while (1)
loop.
2. The loop processes signals and run the key function
run_one_iteration().
3. run_one_iteration() checks if there are sufficient
data for input and sufficient available
space for output for the
block.
4. If yes, call general_work() to run the main
functionality of the block. Otherwise,
return
BLKD_OUT, BLKD_IN, or
others.
1. 每个block的线程都有一个while(1)循环
2. 循环处理信号并且运行主要函数 run_one_iteration()
3. run_one_iteration() 检查
是否有足够的数据输入和为这个block输出足够的可靠的空间
4. 如果都可以,调用general_work() 去跑这个block主要的功能。否则,返回BLKD_OUT,
BLKD_IN,或者其他。