Thursday, October 11, 2012

Gnuradio scheduler : Part-1 How the GNU Radio scheduler is called and what it does

Today I read an interesting and informative pdf about how does the core of gnuradio works. Its based on gnuradio version 3.3.0

* Please donwload gnuradio 3.3.0 from here
 http://gnuradio.org/releases/gnuradio/gnuradio-3.3.0.tar.gz

The author is Zhuo Lu

Lets take an example for dial_tone.py


dial_tone.py
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

from gnuradio import gr
from gnuradio import audio
from gnuradio.eng_option import eng_option
from optparse import OptionParser

class my_top_block(gr.top_block):

    def __init__(self):
        gr.top_block.__init__(self)

        parser = OptionParser(option_class=eng_option)
        parser.add_option("-O", "--audio-output", type="string", default="",
                          help="pcm output device name.  E.g., hw:0,0 or /dev/dsp")
        parser.add_option("-r", "--sample-rate", type="eng_float", default=48000,
                          help="set sample rate to RATE (48000)")
        (options, args) = parser.parse_args ()
        if len(args) != 0:
            parser.print_help()
            raise SystemExit, 1

        sample_rate = int(options.sample_rate)
        ampl = 0.1

        src0 = gr.sig_source_f (sample_rate, gr.GR_SIN_WAVE, 350, ampl)
        src1 = gr.sig_source_f (sample_rate, gr.GR_SIN_WAVE, 440, ampl)
        dst = audio.sink (sample_rate, options.audio_output)
        self.connect (src0, (dst, 0))
        self.connect (src1, (dst, 1))


if __name__ == '__main__':
    try:
        my_top_block().run()
    except KeyboardInterrupt:
        pass

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Besides some running connection for the blocks, gnuradio running threads start at my_top_block.run() only.

Lets see what happens afterwards

** Python and C++ classes have one to one correspondence  via SWIG.

run() is defined in C++ class gr_top_block. To find this go to

/gnuradio-3.3.0/gnuradio-core/src/lib/runtime

There you will find gr_top_block.cc

There you can find the definition for run() as follows :

void
gr_top_block::run()
{
  start();
  wait();
}


Also the definition of start() is

void
gr_top_block::start()
{
  d_impl->start();
}




d_impl is a member that points to the class gr_top_block_impl. This class can be found in  /gnuradio-3.3.0/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc

* -> is a structure dereference operator. a->b means member b of the object    pointed to by a
*  while . is a structure reference operator. a.b means member b of object a


So here it means start() method as defined in the object pointed by d_impl i.e. gr_top_block_impl.cc (see line 45 of gr_top_block.cc)


Lets open the file gr_top_block_impl.cc and see the definition of start()

void
gr_top_block_impl::start()
{
  gruel::scoped_lock    l(d_mutex);

  if (d_state != IDLE)
    throw std::runtime_error("top_block::start: top block already running or  wait() not called after previous stop()");

  if (d_lock_count > 0)
    throw std::runtime_error("top_block::start: can't start with flow graph locked");

  // Create new flat flow graph by flattening hierarchy
  d_ffg = d_owner->flatten();

  // Validate new simple flow graph and wire it up
  d_ffg->validate();
  d_ffg->setup_connections();

  d_scheduler = make_scheduler(d_ffg);
  d_state = RUNNING;
}


The codes do some sanity check and then create the GNU Radio Scheduler by calling

d scheduler = make_scheduler ( d_ffg ) ;





Now lets go to the definition of make_scheduler(), it is also defined in gr_top_block_impl.cc as follows

static gr_scheduler_sptr make_scheduler(gr_flat_flowgraph_sptr ffg)
{
  static scheduler_maker  factory = 0;

  if (factory == 0){
    char *v = getenv("GR_SCHEDULER");
    if (!v)
      factory = scheduler_table[0].f;    // use default
    else {
      for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
    if (strcmp(v, scheduler_table[i].name) == 0){
      factory = scheduler_table[i].f;
      break;
    }
      }
      if (factory == 0){
    std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
          << v << "\".  Using \"" << scheduler_table[0].name << "\"\n";
    factory = scheduler_table[0].f;
      }
    }
  }
  return factory(ffg);
}


In the above definition we have a strange variable

"static scheduler_maker  factory" which has been initialized to 0

* A variable declared static in a function retains its state between calls to that function.

Its definition is given in the very beginning of the file as

 typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg)

So it seems that factory is a function pointer and where does it point to ?

Well its written that

factory = scheduler_table[i].f

Lets see what is inside scheduler_table. Its actually an array.   See in the begining of the file gr_top_block_impl.cc

static struct scheduler_table {
  const char            *name;
  scheduler_maker    f;
} scheduler_table[] = {
  { "TPB",    gr_scheduler_tpb::make },    // first entry is default
  { "STS",    gr_scheduler_sts::make }
};
 



It seems that it points to the member function "make" in the scheduler's class.

So it checks whether there exists a Linux environment variable:
GR_SCHEDULER.

If no, use the default scheduler(TPB); otherwise, use user’s choice. And we do not have so many choices on the scheduler:



1. TPB (default): multi-threaded scheduler.
2. STS: single-threaded scheduler.

So by default, gr_scheduler_tpb::make will be called.





Now lets see what is inside gr_scheduler_tpb.cc You can find it in
/gnuradio-3.3.0/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc

In the following 2 lines the constructor of the gr_scheduler_tpb is called

gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
{
  return gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
}


The next few line are

gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
  : gr_scheduler(ffg)
{
  // Get a topologically sorted vector of all the blocks in use.
  // Being topologically sorted probably isn't going to matter, but
  // there's a non-zero chance it might help...

  gr_basic_block_vector_t used_blocks = ffg->calc_used_blocks();
  used_blocks = ffg->topological_sort(used_blocks);
  gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(used_blocks);

  // Ensure that the done flag is clear on all blocks

  for (size_t i = 0; i < blocks.size(); i++){
    blocks[i]->detail()->set_done(false);
  }

  // Fire off a thead for each block

  for (size_t i = 0; i < blocks.size(); i++){
    std::stringstream name;
    name << "thread-per-block[" << i << "]: " << blocks[i];
    d_threads.create_thread(
      gruel::thread_body_wrapper(tpb_container(blocks[i]), name.str()));
  }
}


Last two lines are important here

d_threads.create_thread(
      gruel::thread_body_wrapper(tpb_container(blocks[i]), name.str()));



thread_body_wrapper wraps the main thread with the block name. Then, the thread begins from thread_body_wrapper().


Lets see inside thread_body_wrapper.h
you can locate it in gnuradio-3.3.0/gruel/src/include/gruel and the .cc file is in
gnuradio-3.3.0/gruel/src/lib


  template
  class thread_body_wrapper
  {
    F         d_f;
    std::string d_name;

  public:

    explicit thread_body_wrapper(F f, const std::string &name="")
      : d_f(f), d_name(name) {}

    void operator()()
    {
      mask_signals();

      try {
    d_f();
      }


So operator() has been overloaded here and d_f() is called actually and it is explicitly linked to tpb_container class . You can see this in the gr_scheduler_tpb.cc file in the definition of the class tpb_container

Lets see the code of tpb_container class :

class tpb_container
{
  gr_block_sptr    d_block;
 
public:
  tpb_container(gr_block_sptr block) : d_block(block) {}

  void operator()()
  {
    gr_tpb_thread_body    body(d_block);
  }
};


So the overloading of operate() just constructs another class "gr_tpb_thread_body"


From here the schedulers work is done.

So in brief the gnuradio scheduler does the following :

1. Analyze used blocks in gr_top_block
2. Default scheduler is TPB which creates multi-threads for blocks
3. The scheduler creates one concurrent thread for each block
4. For each block the thread's entry is gr_tpb_thread_body body(d_block)



























































No comments:

Post a Comment