Subsections


V. Developer Manual

This chapter is intended to advanced users that need to program parallel applications (MPI), new filters or synchronizers, to add new functionalities to the flowvr deamon, etc.


20. Advanced Example : the Fluid Application

The fluid application shows the following features:

For testing the application check that MPI is installed and execute:



make fluid


Figure: The Fluid Application
Image flowvr-fluid1

This application enables a user to disturb a fluid flow with his mouse (share/flowvr/examples/fluid). We distinguish three parts:

Figure: The Fluid Application with the simulation parallelized on 4 modules
Image flowvr-fluid2

The simulation is parallelized by splitting the grid cell into blocks distributed amongst the different MPI processes (the fluid simulation metamodule spawns one FlowVR module for each MPI process). For instance, a $2^kx2^k$ grid is split into 4 $2^{(k-1)}x2^{(k-1)}$ blocks on 4 processes, into 8 $2^{(k-2)}x2^{(k-2)}$ blocks on 8 processes, etc. To perform the necessary computations the state of the cells on the block borders must be exchanged between neighbors at each iteration. These communications are managed by MPI (share/flowvr/examples/fluid/modules/src/TurbulentBase.cpp). The mouse position being required for the simulation, it must be broadcasted to each process. The FlowVR network takes care of it. Next, after each iteration, the result must be sent to the visualization module for rendering. The results are distributed amongst the different MPI processes. The visualization module is accepting only a complex result. For sake of modularity, the visualization module should not be aware of the details of the computation of the simulation. Simetrically, the simulation should not be aware of the use of its result. For that purpose, we delegate the conversion of the visualization output to a format acceptable for the visualization at the FlowVR network level, using a tree of merging filters.

Notice that in the previous schematic figures (Fig. [*] and Fig. [*]) we depict the mouse event capture as a standalone module (for sake of clarity). Actually this is not the case in the implementation. The visualization and mouse event capture are part of the same module. The actual application implementation is detailed in the following section.

To better understand the details of algorithm used for the fluid simulation refer to www-id.imag.fr/~raffin/papers/ID/ieeevr01.pdf or the J. Stam article, "Stable Fluids", published at SIGGRAPH 99.

20.1 The Modules

20.1.1 The Simulation Module Code

The main files:

This metamodule is an MPI program, i.e. a parallel program. If OpenMPI is present, it compiles a single process version (share/flowvr/examples/fluid/bin/fluid) and a MPI one (share/flowvr/examples/fluid/bin/fluidmpi). The parallel code is included in the sequential code, but included only if the USE_MPI is defined.

Each process as an unique id, its rank, available through the MPI_Comm_rank function call. This is this rank that is used by each process to register as a FlowVR module during the initialization step. This rank is also used to ensure each module spawned from this metamodule has an unique FlowVR id: the rank is appended to the name returned by FLOWVR_MODNAME when calling the initModule() function.

20.1.2 The Simulation MetaModule and Module Components

The module component (share/flowvr/examples/fluid/include/fluid/components/modulefluid.comp.h) is rather classical. We use the samemoduel for the sequential code and the MPI one: the module ports are the same.

We have 2 different metamodule components. One for the sequential version using the flowvr-run-ssh launching command (share/flowvr/examples/fluid/include/fluid/components/metamodulefluid.comp.h). For that purposes, it inherits form the MetaModuleFlowvrRunSSH metamodule class. One for the MPI code using the mpi-run launching command (share/flowvr/examples/fluid/include/fluid/components/metamodulefluid.comp.h). It inherits from the MetaModuleOpenMPI metamodule code. Notice that because mpirun launchers are not standard accross different implementations, a new metamodule should be developed if an implementation of MPI other that OpenMPI is used.

These metamodules get the "nx" and "ny" parameter to pass these values to the command line. These values are read by each module for its internal configuration. Using parameters enable to easily change these values from the flowvr command line without recompiling the application.

20.1.3 The Mouse Event Capture and Visualization Code

See share/flowvr/examples/fluid/src/gldens.cpp for the module code.

This metamodule relies on Glut. Therefore the loop structure is hidden behind the callback style imposed by Glut. This metamodule can spawn several visualization modules (just the same code launched several times). Only one of these modules open a position output port that provides the mouse position. This module is chosen by providing a $1$ for the four argument of gldens executable.

The module gldens uses a stamp N to read the size of the received matrices. It uses a class GridInputPort to define this user stamp (see share/flowvr/examples/fluid/src/gldens.cpp):



/// Input Port for 2D Grids with stamp "N" for size
class GridInputPort : public flowvr::InputPort

public:
  GridInputPort(const char* name="grid")
    : InputPort(name),
      stampN("N",flowvr::TypeArray::create(2,flowvr::TypeInt::create()))
  
    stamps->add(&stampN);
  
  flowvr::StampInfo stampN;
;
GridInputPort portDensity("density");
GridInputPort portVelocity("velocity");


Note that you are not required to define a new class to simply add a stamp. For example, the same module gldens defines a stamp B on the output position port to send the state of the mouse's buttons using the following code:



OutputPort portPosition("position");
StampInfo stampButtons("B",TypeInt::create());
// .... //
  portPosition.stamps->add(&stampButtons);


To write the values of a stamp in a new message you must call the write method of the StampsWrite class. This method needs as arguments the description of the stamp to write and its new value. In our example this leads to:



MessageWrite mpos;
    // .... //
    mpos.stamps.write(stampButtons,mouse_down);


Similarly, a stamp is read using the read method. Note that this method return true if the operation succeeded and false otherwise. In the case where the stamp is not present in the received message an error is returned when the module reads the stamp. Then you can either decide to ignore the error and use a default value instead, or stop the execution of the module and warn the user. Our gldens example module uses the default dimensions if the stamp is not present. The code to read the stamp N in this case is:



int nx,ny;
  if (m.stamps.read(portDensity.stampN[0],nx) && m.stamps.read(portDensity.stampN[1],ny))
  
    // Store the given dimension
  


20.1.4 The Mouse Event Capture and Visualization Modules and Metamodules Components

The module component (share/flowvr/examples/fluid/include/fluid/components/modulegldens.comp.h) and the metamodule component (share/flowvr/examples/fluid/include/fluid/components/metamodulegldens.comp.h) are rather classical (use flowvr-run-ssh as launcher).

20.2 The Application

20.2.1 Fluid

The sequential simulation is associated to a simple application network with direct 1-to-1 communications between both metamodules (share/flowvr/examples/fluid/include/fluid/components/fluid.comp.h and share/flowvr/examples/fluid/src/fluid.comp.cpp).

To start it locally on your machine:



flowvr -L -x Fluid


20.2.2 FluidMPI

The MPI simulation is associated to a more advanced network. Because the fluid metamoduel spawn several moduls collective communications (Com1ToN and Com1ToN) are used. (share/flowvr/examples/fluid/include/fluid/components/fluidmpi.comp.h and share/flowvr/examples/fluid/src/fluidmpi.comp.cpp).

To start it locally on your machine you must rely on the fluidmpi.cvs file (share/flowvr/examples/fluid/fluidmpi.csv) that requests 4 instances to be launched on the local host. If you have a quad-core processor you should notice some performance gain compared to the sequential version:



flowvr -x FluidMPI


21. FlowVR Architecture Overview


21.1 The Daemon

The daemon is the main component of the FlowVR architecture. It can be viewed as a collection of objects and plugins. Some of these objects are automatically loaded (like the commander), others need a control command (filters). The objects loaded by the FlowVR daemon can be passive objects (routing table, filters) or threads (regulator, net).

When the daemon starts it:

21.1.1 Message Handling

21.1.2 Routing Table

Each daemon owns a unique routing table common to all the FlowVR objects running on the node. This obect contains a list of routes. A route is specified by a source and an action. Several actions can be linked to the same source and the number of routes for a source is equal to the number of actions to take when a message comes from this source. A lock is required to protect the completion of control command like adding or removing a route.

The different actions possible when a message must be parsed are :

Main functions of the routing table :

21.1.3 Filters and Synchronizers

See section:programmingfiltersandsynchronizerssection:programmingfiltersandsynchronizers.

21.1.4 Regulator

The regulator can be viewed as a `box' encapsulating the modules of the application. This is the object which deals and interacts with the others componants of the FlowVR daemon. By hiding message handling, access to the shared memory area and managing the internal loop of modules, the regulator allows modules to be ``blind'' and so simpler to develop.

Main characteristics :

21.1.5 Net

The Net is the communicating object needed to allow messages parsing between different nodes running a FlowVR daemon. This object is not loaded automatically when starting a daemon. In the current release the default Net object is based upon the TCP protocol (See flowvrd/src/plugins/sys/flowvr.plugins.NetTCP). The implementation relies on two threads. One for receiving messages and the other for sending messages.

Another implementation of the Net object can be made. To specify the Net object loaded when startting a daemon :



flowvrd -net classnameobject


21.2 The Controller

The controller is a special module. Its role is to launch and control the application. There is only one controller for each FlowVR application. The controller directly access each FlowVR daemon involved in the application (and so these nodes should be visible for the Controller). The controller does not use the routing table. It sends control commands (to start a module, add of a connection, pause or stop the application) and receives status results. The controller makes the bridge between the user and the application.

21.3 The Controller and Daemon Interactions

The interface between the controller and the daemon is the commander. There is exactly one controller per daemon. The commander receives commands from the controller, process the commands and sends the results to the controller. The commander can reach any FlowVR object present on the same node.


21.4 The Command Language

The command language is the protocol used for communications between the controller and the daemons. It fellows an XML syntax. Complete descriptions can be found via the DTDs.

Commands processed by the controller :

NOTE : the action tag also contains a messagetype variable. This is the combinaison of the two values given in this command that determines the type of the connection (stamps only or full).

Two types of message can be sent by the commander to the controller :

21.5 Application Deployment

21.5.1 States components

Depending on whether the component is a module or a daemon plugin (filter or synchronizer), it passes through successive statements when the application is launched.

21.5.1.1 Modules

A module is not a plugin loaded by the daemon, but run in a separate process. It has four different states : 'not launched', 'launched', 'paused' and 'started'.

Figure: The differents states of a module
Image metamodule_states

21.5.1.2 Plugin states

A plugin running in a thread and loaded by the deamon has three differents states : 'launched', 'paused' and 'started.

Figure: The differents states of a plugin
Image module_states

21.5.2 The 4 Deployment Stages

An application is deployed in four stages:

  1. Starting Modules: retrieve the launching commands from the .run.xml file and execute them. It starts the metamodules (and its associated modules). Each module can go through the different states: 'not launched', 'launched', 'paused' and 'started'.

  2. Plugin loading: The daemon receives plugin loading orders from the controller through the <addobject> commands (stored in the .cmd.xml files). Each plugin can go through thethree states: 'launched', 'paused' and 'started'.

  3. Route Creation: Routes, which represent connexions between components, are added to the routing table of the deamon. The daemon receives route creation orders from the controller through the <addroute> commands (stored in the .cmd.xml files).

    Figure: The differents states of a route
    Image route_states

  4. Start Commands: The controller send start commands to the deamons. The deamon end starting orders to its moduls and plugins in the order defined by their priorities (from the highest to lowest).

21.6 The Telnet Interface

flowvr uses the flowvr-telnet to start an application. This is not visible from the user point of view, but the flowvr-telnet command is available and can be used directly. flowvr-telnet is a simple interface for the controller. Oncelaunched in a shell, the user can directly write commands for the controller following the expected syntax. Special commands are also available : pause, start and stop.

21.6.1 Application launching: Interactive or Batch

flowvr-telnet can be launched in two different ways : batch or interactive.

When the application is launched in the batch mode the stages 1 to 4 are automatically done, so the user can directly write commands for the controller following the expected syntax.

To launch the application in this mode just type the following command :



flowvr-telnet application_name.run.xml application_name.cmd.xml


Now if the application is launched in interactive mode the user controls all stages of the application launching. To launch the application in interactive mode just launch the application with the flag I (interactive) :



flowvr-telnet -I application_name.run.xml application_name.cmd.xml


Thanks to this representation it is possible to apply commands to specific objects, so commands can be applied to a subtree.

There are other commands available that can provide information on the application progress :


22. Programming Filters and Synchronizers

22.1 What for?

Filters and synchronizers are objects altering and controlling the dataflow on the connections between the different modules of a FlowVR application. They are plugins loaded by the daemon. The main difference with modules is that filters and synchronizers are in the same process of the FlowVR daemon, so there is no context switch when using them. An important difference is also that filters have full access to the message queues while modules only have access to the last message available. These objects are designed to control locally the dataflow and the synchronization. For example, a filter can compress or decompress data, merge or scatter them, select the next message to send to the receiving modules. A synchronizer is a special case of filter. All its inputs and outputs are stamps only. It can be associated to a filter to implement a synchronization rule.

22.2 Input and Output Message Queues

Filters and synchronizers are directly loaded by the FlowVR daemon. They do not have the same level of abstraction provided by the regulators for the modules. They manage messages like a regulator. They use input and output message queues.

A message queue is a buffer of messages that can be viewed as FIFO queue. Basically, a filter or a synchronizer owns as much input message queues as input ports and as much output message queues as output ports. So, they own a vector of input message queues and a vector of output message queues.

22.2.1 The Input Message Queue API

Methods to call for the vector of input message queues :

Main methods on the input message queues used by filters and synchronizers :

22.2.2 The Output Message Queue API

Methods to call for the vector of output message queues:

Main methods of the output message queues used for filters and synchronizers:

22.3 The Filters and Synchronizers API

In term of code, filters and synchronizers are the same. Consequently there is only one API provided for both. At this state just filters will be mentionned but all that is true for filters is also true for synchronizers.

Two kinds of filters can be made: Basic or threaded. In the firts case, the activation of the filter is controlled by the input message queues. In the second case, filter must be aware of new message by using signal notification and make active waits on their input message queues.

The next section intends to show code examples for each case (threaded or basic).

22.3.1 Basic Filter Code Example

The following code is part of the code of the filter named FilterIt. It intends to show a simple use of the differents methods provided by the input and output message queue API. Note that this filter needs to wait the stamp specification to finish its initialization.



/// Constructor. FilterIt::FilterIt(std::string objID) : Filter(objID), numout(0), traceout(TypedTrace<int>("out"))

COMMENTS :



flowvr::plugd::Result FilterIt::init(flowvr::xml::DOMElement* xmlRoot, flowvr::plugd::Dispatcher* dispatcher) flowvr::plugd::Result result = Filter::init(xmlRoot, dispatcher); if (result.error()) return result;

//initialization of the input message queue initInputs(NBPORTS); inputs[IDPORT_IN]->setName("in"); inputs[IDPORT_ORDER]->setName("order");

//initialization of the OMQ "out" initOutputs(1); outputs[0]->setName("out"); outputs[0]->msgtype = Message::FULL;

outputTraces.push_back(&traceout); <

return result;

COMMENTS :



void FilterIt::newMessageNotification(int mqid, int msgnum, const Message& msg, Dispatcher* dispatcher)

#ifdef DEBUG
  if (mqid == IDPORT_IN)
    std::cout « objectID()«": new input "«msgnum«" queue size "«inputs[mqid]->size()«std::endl;
  else
    std::cout « objectID()«": new order "«msgnum«" queue size "«inputs[mqid]->size()«std::endl;
#endif
  if (started)
    sendPendingOrders(dispatcher);


void FilterIt::newStampListSpecification(int mqid, const Message& msg, Dispatcher* dispatcher) if (started && mqid == IDPORT_IN) // forward specification to out port #ifdef DEBUG std::cout « objectID()«": forwarding stamps specification"«std::endl; #endif

//forwarding stamplist specification to the OMQ outputs[0]->stamps=inputs[mqid]->getStampList(); outputs[0]->newStampSpecification(dispatcher);

inputs[IDPORT_IN]->setFront(0); sendPendingOrders(dispatcher);

COMMENTS :



void FilterIt::sendPendingOrders(plugd::Dispatcher* dispatcher)
 // MAIN FILTER FUNCTION

if (!inputs[IDPORT_IN]->stampsReceived()) return; // still waiting for stamps specification

int it,itin; for (;;) Message msg; int num; if (!inputs[IDPORT_ORDER]->frontMsg().valid()) #ifdef DEBUG std::cout«objectID()«": waiting orders"«std::endl; #endif return;

msg = inputs[IDPORT_ORDER]->frontMsg();

msg.stamps.read(inputs[IDPORT_ORDER]->getStampList().num,num);

msg.stamps.read(inputs[IDPORT_ORDER]->getStampList().it,it); bool scratch = (it<0); it = (it<0?-it:it)-10; itin = -10; while(!inputs[IDPORT_IN]->empty() && inputs[IDPORT_IN]->frontMsg().valid() && inputs[IDPORT_IN]->frontMsg().stamps.read(inputs[IDPORT_IN]->getStampList().it,itin) && itin<it) inputs[IDPORT_IN]->eraseFront();

if (scratch) inputs[IDPORT_ORDER]->eraseFront(); //std::cout « objectID() « "scratched up to "«it«std::endl; continue; // do not send any message, just clean the queue if (itin>=it) msg = inputs[IDPORT_IN]->frontMsg(); else #ifdef DEBUG std::cout«objectID()«": waiting message "«it«std::endl; #endif return; // missing message; inputs[IDPORT_ORDER]->eraseFront(); num = numout++; #ifdef DEBUG std::cout«objectID()«": sending message "«num«std::endl; #endif MessagePut newmsg; newmsg.stamps.clone(msg.stamps,&inputs[IDPORT_IN]->getStampList()); newmsg.data=msg.data;

traceout.write(it); outputs[0]->put(newmsg,dispatcher,num);

COMMENTS :



flowvr::plugd::GenClass<FilterIt> FilterItClass("flowvr.plugins.FilterIt", // name
                                                "", // description
                                                &flowvr::plugins::FilterClass
                                                );

Class* FilterIt::getClass() return &FilterItClass;

COMMENTS :

22.3.2 Threaded Synchronizer Code Example

The following example is a threaded synchornizer controlling the working rate of a module by controlling its frequency. When threaded, filters or synchronizers are similar to modules in the way that they rely on a wait/get/put loop.



MaxFrequencySynchronizor::MaxFrequencySynchronizor(std::string objID)
  : ThreadedSynchronizor(objID), running(false), freqHz(1.0f)

  threadDispatcher=NULL;


COMMENTS :



flowvr::plugd::Result MaxFrequencySynchronizor::init(flowvr::xml::DOMElement* xmlRoot, flowvr::plugd::Dispatcher* dispatcher)

  flowvr::plugd::Result result = Synchronizor::init(xmlRoot, dispatcher);
  if (result.error()) return result;

threadDispatcher = dispatcher->threadCopy();

freqHz = 1.0f; // freq parameter reading ... xml::DOMNodeList* lfreq = xmlRoot->getElementsByTagName("freq"); if (lfreq->getLength()<1) return Result(flowvr::plugd::Result::ERROR,"No freq parameter"); string fr = lfreq->item(0)->getTextContent(); freqHz = atof(fr.c_str()); delete lfreq; if ( freqHz < 0.001f || freqHz > 100000000.0f ) // incorrect value for frequency in hertz return Result(flowvr::plugd::Result::ERROR,"Incorrect frequency parameter");

initInputs(NBPORTS); inputs[IDPORT_ENDIT]->setName("endIt");

//only one outputmessagequeue for this filter initOutputs(1); outputs[0]->setName("out"); outputs[0]->msgtype=Message::STAMPS;

//give the Stamplist to the outputmessage queue outputs[0]->newStampSpecification(dispatcher);

running = true;

start();

return result;

COMMENTS :



int MaxFrequencySynchronizor::run()

  while ( running )  // stopping condition ? 
    
      // time of this sending 
      gettimeofday(&timeLastSignalSend, &tz);
      MessageWrite msg;      
      outputs[0]->put(msg,threadDispatcher);

ipc::ScopedMTLock locker(globalLock,"MaxFrequencySynchronizor.run");

// wait a new message on the input message queue "endit" wait(IDPORT_ENDIT);

if (!running) break;

// waiting time evaluation long long waitTime = 0; long long diffTime = 0;

struct timeval tv; gettimeofday(&tv, &tz); diffTime=(tv.tv_sec-timeLastSignalSend.tv_sec) * 1000000L + tv.tv_usec-timeLastSignalSend.tv_usec; waitTime = (long long) ( 1000000.0f / freqHz ) - diffTime; if ( waitTime > 0 ) #ifdef DEBUG std::cout«name()«": waiting for "« waitTime «" usecs."«std::endl; #endif usleep(waitTime);

inputs[IDPORT_ENDIT]->eraseFront();

#ifdef DEBUG std::cout« objectID() « ": MaxFrequencySynchronizor thread now stopped. " « std::endl; #endif

threadDispatcher->close();

return 0;

COMMENTS :

avanel 2009-12-10