This chapter is intended to advanced users that need to program parallel applications (MPI), new filters or synchronizers, to add new functionalities to the flowvr deamon, etc.
The fluid application shows the following features:
For testing the application check that MPI is installed and execute:
make fluid
This application enables a user to disturb a fluid flow with his mouse (share/flowvr/examples/fluid). We distinguish three parts:
The simulation is parallelized by splitting the grid cell into blocks
distributed amongst the different MPI processes (the fluid simulation
metamodule spawns one FlowVR module for each MPI process). For
instance, a
grid is split into 4
blocks on 4 processes, into 8
blocks on 8
processes, etc. To perform the necessary computations the state of
the cells on the block borders must be exchanged between neighbors at
each iteration. These communications are managed by MPI
(share/flowvr/examples/fluid/modules/src/TurbulentBase.cpp). The mouse position
being required for the simulation, it must be broadcasted to each
process. The FlowVR network takes care of it. Next, after each
iteration, the result must be sent to the visualization module for
rendering. The results are distributed amongst the different MPI
processes. The visualization module is accepting only a complex
result. For sake of modularity, the visualization module should not
be aware of the details of the computation of the simulation.
Simetrically, the simulation should not be aware of the use of its
result. For that purpose, we delegate the conversion of the
visualization output to a format acceptable for the visualization at
the FlowVR network level, using a tree of merging filters.
Notice that in the previous schematic figures
(Fig.
and
Fig.
) we depict the mouse event
capture as a standalone module (for sake of clarity). Actually this is
not the case in the implementation. The visualization and mouse event
capture are part of the same module. The actual application
implementation is detailed in the following section.
To better understand the details of algorithm used for the fluid simulation refer to www-id.imag.fr/~raffin/papers/ID/ieeevr01.pdf or the J. Stam article, "Stable Fluids", published at SIGGRAPH 99.
The main files:
This metamodule is an MPI program, i.e. a parallel program. If OpenMPI is present, it compiles a single process version (share/flowvr/examples/fluid/bin/fluid) and a MPI one (share/flowvr/examples/fluid/bin/fluidmpi). The parallel code is included in the sequential code, but included only if the USE_MPI is defined.
Each process as an unique id, its rank, available through the MPI_Comm_rank function call. This is this rank that is used by each process to register as a FlowVR module during the initialization step. This rank is also used to ensure each module spawned from this metamodule has an unique FlowVR id: the rank is appended to the name returned by FLOWVR_MODNAME when calling the initModule() function.
The module component (share/flowvr/examples/fluid/include/fluid/components/modulefluid.comp.h) is rather classical. We use the samemoduel for the sequential code and the MPI one: the module ports are the same.
We have 2 different metamodule components. One for the sequential version using the flowvr-run-ssh launching command (share/flowvr/examples/fluid/include/fluid/components/metamodulefluid.comp.h). For that purposes, it inherits form the MetaModuleFlowvrRunSSH metamodule class. One for the MPI code using the mpi-run launching command (share/flowvr/examples/fluid/include/fluid/components/metamodulefluid.comp.h). It inherits from the MetaModuleOpenMPI metamodule code. Notice that because mpirun launchers are not standard accross different implementations, a new metamodule should be developed if an implementation of MPI other that OpenMPI is used.
These metamodules get the "nx" and "ny" parameter to pass these values to the command line. These values are read by each module for its internal configuration. Using parameters enable to easily change these values from the flowvr command line without recompiling the application.
See share/flowvr/examples/fluid/src/gldens.cpp for the module code.
This metamodule relies on Glut. Therefore the loop structure is
hidden behind the callback style imposed by Glut. This metamodule can
spawn several visualization modules (just the same code launched
several times). Only one of these modules open a position output port
that provides the mouse position. This module is chosen by providing a
for the four argument of gldens executable.
The module gldens uses a stamp N to read the size of the received matrices. It uses a class GridInputPort to define this user stamp (see share/flowvr/examples/fluid/src/gldens.cpp):
/// Input Port for 2D Grids with stamp "N" for size
class GridInputPort : public flowvr::InputPort
public:
GridInputPort(const char* name="grid")
: InputPort(name),
stampN("N",flowvr::TypeArray::create(2,flowvr::TypeInt::create()))
stamps->add(&stampN);
flowvr::StampInfo stampN;
;
GridInputPort portDensity("density");
GridInputPort portVelocity("velocity");
Note that you are not required to define a new class to simply add a stamp. For example, the same module gldens defines a stamp B on the output position port to send the state of the mouse's buttons using the following code:
OutputPort portPosition("position");
StampInfo stampButtons("B",TypeInt::create());
// .... //
portPosition.stamps->add(&stampButtons);
To write the values of a stamp in a new message you must call the write method of the StampsWrite class. This method needs as arguments the description of the stamp to write and its new value. In our example this leads to:
MessageWrite mpos;
// .... //
mpos.stamps.write(stampButtons,mouse_down);
Similarly, a stamp is read using the read method. Note that this method return true if the operation succeeded and false otherwise. In the case where the stamp is not present in the received message an error is returned when the module reads the stamp. Then you can either decide to ignore the error and use a default value instead, or stop the execution of the module and warn the user. Our gldens example module uses the default dimensions if the stamp is not present. The code to read the stamp N in this case is:
int nx,ny;
if (m.stamps.read(portDensity.stampN[0],nx) && m.stamps.read(portDensity.stampN[1],ny))
// Store the given dimension
The module component (share/flowvr/examples/fluid/include/fluid/components/modulegldens.comp.h) and the metamodule component (share/flowvr/examples/fluid/include/fluid/components/metamodulegldens.comp.h) are rather classical (use flowvr-run-ssh as launcher).
The sequential simulation is associated to a simple application network with direct 1-to-1 communications between both metamodules (share/flowvr/examples/fluid/include/fluid/components/fluid.comp.h and share/flowvr/examples/fluid/src/fluid.comp.cpp).
To start it locally on your machine:
flowvr -L -x Fluid
The MPI simulation is associated to a more advanced network. Because the fluid metamoduel spawn several moduls collective communications (Com1ToN and Com1ToN) are used. (share/flowvr/examples/fluid/include/fluid/components/fluidmpi.comp.h and share/flowvr/examples/fluid/src/fluidmpi.comp.cpp).
To start it locally on your machine you must rely on the fluidmpi.cvs file (share/flowvr/examples/fluid/fluidmpi.csv) that requests 4 instances to be launched on the local host. If you have a quad-core processor you should notice some performance gain compared to the sequential version:
flowvr -x FluidMPI
The daemon is the main component of the FlowVR architecture. It can be viewed as a collection of objects and plugins. Some of these objects are automatically loaded (like the commander), others need a control command (filters). The objects loaded by the FlowVR daemon can be passive objects (routing table, filters) or threads (regulator, net).
When the daemon starts it:
Each daemon owns a unique routing table common to all the FlowVR objects running on the node. This obect contains a list of routes. A route is specified by a source and an action. Several actions can be linked to the same source and the number of routes for a source is equal to the number of actions to take when a message comes from this source. A lock is required to protect the completion of control command like adding or removing a route.
The different actions possible when a message must be parsed are :
Main functions of the routing table :
See section:programmingfiltersandsynchronizerssection:programmingfiltersandsynchronizers.
The regulator can be viewed as a `box' encapsulating the modules of the application. This is the object which deals and interacts with the others componants of the FlowVR daemon. By hiding message handling, access to the shared memory area and managing the internal loop of modules, the regulator allows modules to be ``blind'' and so simpler to develop.
Main characteristics :
The Net is the communicating object needed to allow messages parsing between different nodes running a FlowVR daemon. This object is not loaded automatically when starting a daemon. In the current release the default Net object is based upon the TCP protocol (See flowvrd/src/plugins/sys/flowvr.plugins.NetTCP). The implementation relies on two threads. One for receiving messages and the other for sending messages.
Another implementation of the Net object can be made. To specify the Net object loaded when startting a daemon :
flowvrd -net classnameobject
The controller is a special module. Its role is to launch and control the application. There is only one controller for each FlowVR application. The controller directly access each FlowVR daemon involved in the application (and so these nodes should be visible for the Controller). The controller does not use the routing table. It sends control commands (to start a module, add of a connection, pause or stop the application) and receives status results. The controller makes the bridge between the user and the application.
The interface between the controller and the daemon is the commander. There is exactly one controller per daemon. The commander receives commands from the controller, process the commands and sends the results to the controller. The commander can reach any FlowVR object present on the same node.
The command language is the protocol used for communications between the controller and the daemons. It fellows an XML syntax. Complete descriptions can be found via the DTDs.
Commands processed by the controller :
NOTE : the action tag also contains a messagetype variable. This is the combinaison of the two values given in this command that determines the type of the connection (stamps only or full).
Two types of message can be sent by the commander to the controller :
A module is not a plugin loaded by the daemon, but run in a separate process. It has four different states : 'not launched', 'launched', 'paused' and 'started'.
An application is deployed in four stages:
flowvr uses the flowvr-telnet to start an application. This is not visible from the user point of view, but the flowvr-telnet command is available and can be used directly. flowvr-telnet is a simple interface for the controller. Oncelaunched in a shell, the user can directly write commands for the controller following the expected syntax. Special commands are also available : pause, start and stop.
flowvr-telnet can be launched in two different ways : batch or interactive.
When the application is launched in the batch mode the stages 1 to 4 are automatically done, so the user can directly write commands for the controller following the expected syntax.
To launch the application in this mode just type the following command :
flowvr-telnet application_name.run.xml application_name.cmd.xml
Now if the application is launched in interactive mode the user controls all stages of the application launching. To launch the application in interactive mode just launch the application with the flag I (interactive) :
flowvr-telnet -I application_name.run.xml application_name.cmd.xml
Thanks to this representation it is possible to apply commands to specific objects, so commands can be applied to a subtree.
There are other commands available that can provide information on the application progress :
Filters and synchronizers are objects altering and controlling the dataflow on the connections between the different modules of a FlowVR application. They are plugins loaded by the daemon. The main difference with modules is that filters and synchronizers are in the same process of the FlowVR daemon, so there is no context switch when using them. An important difference is also that filters have full access to the message queues while modules only have access to the last message available. These objects are designed to control locally the dataflow and the synchronization. For example, a filter can compress or decompress data, merge or scatter them, select the next message to send to the receiving modules. A synchronizer is a special case of filter. All its inputs and outputs are stamps only. It can be associated to a filter to implement a synchronization rule.
Filters and synchronizers are directly loaded by the FlowVR daemon. They do not have the same level of abstraction provided by the regulators for the modules. They manage messages like a regulator. They use input and output message queues.
A message queue is a buffer of messages that can be viewed as FIFO queue. Basically, a filter or a synchronizer owns as much input message queues as input ports and as much output message queues as output ports. So, they own a vector of input message queues and a vector of output message queues.
Methods to call for the vector of input message queues :
Main methods on the input message queues used by filters and synchronizers :
Methods to call for the vector of output message queues:
Main methods of the output message queues used for filters and synchronizers:
In term of code, filters and synchronizers are the same. Consequently there is only one API provided for both. At this state just filters will be mentionned but all that is true for filters is also true for synchronizers.
Two kinds of filters can be made: Basic or threaded. In the firts case, the activation of the filter is controlled by the input message queues. In the second case, filter must be aware of new message by using signal notification and make active waits on their input message queues.
The next section intends to show code examples for each case (threaded or basic).
The following code is part of the code of the filter named FilterIt. It intends to show a simple use of the differents methods provided by the input and output message queue API. Note that this filter needs to wait the stamp specification to finish its initialization.
/// Constructor. FilterIt::FilterIt(std::string objID) : Filter(objID), numout(0), traceout(TypedTrace<int>("out"))
COMMENTS :
flowvr::plugd::Result FilterIt::init(flowvr::xml::DOMElement* xmlRoot, flowvr::plugd::Dispatcher* dispatcher) flowvr::plugd::Result result = Filter::init(xmlRoot, dispatcher); if (result.error()) return result;
//initialization of the input message queue initInputs(NBPORTS); inputs[IDPORT_IN]->setName("in"); inputs[IDPORT_ORDER]->setName("order");
//initialization of the OMQ "out" initOutputs(1); outputs[0]->setName("out"); outputs[0]->msgtype = Message::FULL;
outputTraces.push_back(&traceout); <
return result;
COMMENTS :
void FilterIt::newMessageNotification(int mqid, int msgnum, const Message& msg, Dispatcher* dispatcher)
#ifdef DEBUG
if (mqid == IDPORT_IN)
std::cout « objectID()«": new input "«msgnum«" queue size "«inputs[mqid]->size()«std::endl;
else
std::cout « objectID()«": new order "«msgnum«" queue size "«inputs[mqid]->size()«std::endl;
#endif
if (started)
sendPendingOrders(dispatcher);
void FilterIt::newStampListSpecification(int mqid, const Message& msg, Dispatcher* dispatcher)
if (started && mqid == IDPORT_IN)
// forward specification to out port
#ifdef DEBUG
std::cout « objectID()«": forwarding stamps specification"«std::endl;
#endif
//forwarding stamplist specification to the OMQ
outputs[0]->stamps=inputs[mqid]->getStampList();
outputs[0]->newStampSpecification(dispatcher);
inputs[IDPORT_IN]->setFront(0);
sendPendingOrders(dispatcher);
COMMENTS :
void FilterIt::sendPendingOrders(plugd::Dispatcher* dispatcher) // MAIN FILTER FUNCTIONif (!inputs[IDPORT_IN]->stampsReceived()) return; // still waiting for stamps specification
int it,itin; for (;;) Message msg; int num; if (!inputs[IDPORT_ORDER]->frontMsg().valid()) #ifdef DEBUG std::cout«objectID()«": waiting orders"«std::endl; #endif return;
msg = inputs[IDPORT_ORDER]->frontMsg();
msg.stamps.read(inputs[IDPORT_ORDER]->getStampList().num,num);
msg.stamps.read(inputs[IDPORT_ORDER]->getStampList().it,it); bool scratch = (it<0); it = (it<0?-it:it)-10; itin = -10; while(!inputs[IDPORT_IN]->empty() && inputs[IDPORT_IN]->frontMsg().valid() && inputs[IDPORT_IN]->frontMsg().stamps.read(inputs[IDPORT_IN]->getStampList().it,itin) && itin<it) inputs[IDPORT_IN]->eraseFront();
if (scratch) inputs[IDPORT_ORDER]->eraseFront(); //std::cout « objectID() « "scratched up to "«it«std::endl; continue; // do not send any message, just clean the queue if (itin>=it) msg = inputs[IDPORT_IN]->frontMsg(); else #ifdef DEBUG std::cout«objectID()«": waiting message "«it«std::endl; #endif return; // missing message; inputs[IDPORT_ORDER]->eraseFront(); num = numout++; #ifdef DEBUG std::cout«objectID()«": sending message "«num«std::endl; #endif MessagePut newmsg; newmsg.stamps.clone(msg.stamps,&inputs[IDPORT_IN]->getStampList()); newmsg.data=msg.data;
traceout.write(it); outputs[0]->put(newmsg,dispatcher,num);
COMMENTS :
flowvr::plugd::GenClass<FilterIt> FilterItClass("flowvr.plugins.FilterIt", // name
"", // description
&flowvr::plugins::FilterClass
);
Class* FilterIt::getClass()
return &FilterItClass;
COMMENTS :
The following example is a threaded synchornizer controlling the working rate of a module by controlling its frequency. When threaded, filters or synchronizers are similar to modules in the way that they rely on a wait/get/put loop.
MaxFrequencySynchronizor::MaxFrequencySynchronizor(std::string objID) : ThreadedSynchronizor(objID), running(false), freqHz(1.0f) threadDispatcher=NULL;
COMMENTS :
flowvr::plugd::Result MaxFrequencySynchronizor::init(flowvr::xml::DOMElement* xmlRoot, flowvr::plugd::Dispatcher* dispatcher) flowvr::plugd::Result result = Synchronizor::init(xmlRoot, dispatcher); if (result.error()) return result;threadDispatcher = dispatcher->threadCopy();
freqHz = 1.0f; // freq parameter reading ... xml::DOMNodeList* lfreq = xmlRoot->getElementsByTagName("freq"); if (lfreq->getLength()<1) return Result(flowvr::plugd::Result::ERROR,"No freq parameter"); string fr = lfreq->item(0)->getTextContent(); freqHz = atof(fr.c_str()); delete lfreq; if ( freqHz < 0.001f || freqHz > 100000000.0f ) // incorrect value for frequency in hertz return Result(flowvr::plugd::Result::ERROR,"Incorrect frequency parameter");
initInputs(NBPORTS); inputs[IDPORT_ENDIT]->setName("endIt");
//only one outputmessagequeue for this filter initOutputs(1); outputs[0]->setName("out"); outputs[0]->msgtype=Message::STAMPS;
//give the Stamplist to the outputmessage queue outputs[0]->newStampSpecification(dispatcher);
running = true;
start();
return result;
COMMENTS :
int MaxFrequencySynchronizor::run()
while ( running ) // stopping condition ?
// time of this sending
gettimeofday(&timeLastSignalSend, &tz);
MessageWrite msg;
outputs[0]->put(msg,threadDispatcher);
ipc::ScopedMTLock locker(globalLock,"MaxFrequencySynchronizor.run");
// wait a new message on the input message queue "endit"
wait(IDPORT_ENDIT);
if (!running)
break;
// waiting time evaluation
long long waitTime = 0;
long long diffTime = 0;
struct timeval tv;
gettimeofday(&tv, &tz);
diffTime=(tv.tv_sec-timeLastSignalSend.tv_sec) * 1000000L + tv.tv_usec-timeLastSignalSend.tv_usec;
waitTime = (long long) ( 1000000.0f / freqHz ) - diffTime;
if ( waitTime > 0 )
#ifdef DEBUG
std::cout«name()«": waiting for "« waitTime «" usecs."«std::endl;
#endif
usleep(waitTime);
inputs[IDPORT_ENDIT]->eraseFront();
#ifdef DEBUG
std::cout« objectID() « ": MaxFrequencySynchronizor thread now stopped. " « std::endl;
#endif
threadDispatcher->close();
return 0;
COMMENTS :
avanel 2009-12-10