POOMA Tutorials
Managing Threads Explicitly

The program shown in this appendix is the most complicated to appear in these tutorials. Building on tutorial 4, it sums the values in an array, taking the layout of that array into account. Unlike the multi-patch and layout examples of tutorial 4, however, this program explicitly spawns threads to perform the accumulation on each patch of the array.

Much of this code should seem familiar---the specialized accumulateWithLoop() functions, for example, have already been discussed. The novelty lies in the classes ResultHolder and ArrayAccumulator, the templated function spawn_accumulate(), and the specialized accumulation function accumulate(). These are all discussed briefly after the code (which is in examples/Patches/Threaded/Accumulate.h in the release) is presented.

#ifndef ACCUMULATE_H
#define ACCUMULATE_H

#include <pthread.h>

template<int D, class T, class E> class ConstArray;
template<int D> class UniformGridLayout;

//----------------------------------------------------------------------
// The guts of the accumulation algorithm.
// Specialized here for dimension 1, 2 and 3.
// Can't call these 'accumulate' because it would be ambiguous.
//----------------------------------------------------------------------

template<class T, class E>
inline T accumulateWithLoop(
const ConstArray<1, T, E> & x
){
T sum = 0;
int f0 = x.first(0);
int l0 = x.last(0);
for (int i0=f0;i0<=l0; ++i0)
sum += x(i0);
return sum;
}

template<class T, class E>
inline T accumulateWithLoop(
const ConstArray<2, T, E> & x
){
T sum = 0;
int f0 = x.first(0);
int f1 = x.first(1);
int l0 = x.last(0);
int l1 = x.last(1);
for (int i1=f1; i1<=l1; ++i1)
{
for (int i0=f0;i0<=l0; ++i0)
{
sum += x(i0, i1);
}
}
return sum;
}

template<class T, class E>
inline T accumulateWithLoop(
const ConstArray<3, T, E> & x
){
T sum = 0;
int f0 = x.first(0);
int f1 = x.first(1);
int f2 = x.first(2);
int l0 = x.last(0);
int l1 = x.last(1);
int l2 = x.last(2);
for (int i2=f2; i2<=l2; ++i2)
{
for (int i1=f1; i1<=l1; ++i1)
{
for (int i0=f0;i0<=l0; ++i0)
{
sum += x(i0, i1);
}
}
}
return sum;
}

//----------------------------------------------------------------------
// The user interface for accumulate.
// Bricks just call the dimension specialized versions.
//----------------------------------------------------------------------

template<int D, class T>
T accumulate(
const ConstArray<D, T, Brick> & x
){
return accumulateWithLoop(x);
}

template<int D1, class T, int D2, bool S>
T accumulate(
const ConstArray<D1, T, BrickView<D2, S>> & x
){
return accumulateWithLoop(x);
}

//----------------------------------------------------------------------
// class ResultHolder<T>
//
// A class which holds the result of a calculation in such
// a way that you don't have to worry about how it got it.
// That is handled in subclasses.
//----------------------------------------------------------------------

template<class T>
class ResultHolder
{
public:
ResultHolder()
{}

virtual ~ResultHolder()
{}

const T& get()
{
return result;
}

protected:
T result;
};

//----------------------------------------------------------------------
// class ArrayAccumulator<T, ArrayType>
//
// A specific type of calculation that returns using a ResultHolder.
// This holds an array of arbitrary type and accumulates the sum
// into the result.
//----------------------------------------------------------------------

template<class T, class ArrayType>
class ArrayAccumulator : public ResultHolder<T>
{
public:
// Remember my type.
typedef ArrayAccumulator<T, ArrayType> This_t;

// Let the member data destroy itself.
virtual ~ArrayAccumulator()
{}

// A static function that will be run in a thread.
// The data passed in is an object of type This_t.
static void *threadAccumulate(
void * x
){
This_t *y = static_cast<This_t*>(x);
y->result = accumulate(y->array);
return x;
}

// Construct with a const ref to an array.
// Just remember the array.
ArrayAccumulator(
const ArrayType & a
) : array(a)
{}

private:
// Store the array by value since the one passed in could be
// a temporary.
ArrayType array;
};

//----------------------------------------------------------------------
// void spawn_thread(pthread_id, ArrayType)
//
// Spawns a thread that runs an ArrayAccumultor.
//----------------------------------------------------------------------

template<class ArrayType>
inline void
spawn_accumulate(
pthread_t & id,
const ArrayType & a
){
// Typedefs to make the thread create more clear.
typedef typename ArrayType::Element_t T;
typedef ArrayAccumulator<T, ArrayType> Accumulator_t;

// Spawn a thread:
// Store the id through the reference that is passed in.
// The function to call is threadAccumulate
// The thread data is an ArrayAccumulator using the passed in array.
pthread_create(&id, NULL, Accumulator_t::threadAccumulate,
new Accumulator_t(a));
}

//----------------------------------------------------------------------
// Multipatch version.
// Loop over patches and accumulate each patch.
//----------------------------------------------------------------------

template<int D, class T>
T accumulate(
const ConstArray<D, T, MultiPatch<UniformTag,Brick>> & x
){
// Get the GridLayout from the array.
const GridLayout<2>& layout = x.message(GetGridLayoutTag<2>());

// Find the number of patches. We'll have one thread per patch.
int patches = layout.size();

// An array of thread ids.
pthread_t *ids = new pthread_t[patches];

// Loop over patches.
typename GridLayout<2>::iterator i= x.message(GetGridLayoutTag<2>()).begin();
typename GridLayout<2>::iterator e= x.message(GetGridLayoutTag<2>()).end();
int c=0;
while (i!=e)
{
// Spawn a thread for each patch.
// cout << "spawn" << endl;
spawn_accumulate(ids[c], x(*i));
++i;
++c;
}

// Wait for all the threads to finish.
// Get the sum from each, and accumulate that
// in this thread.
T sum = 0;
for (int j=0; j<c; ++j)
{
// Wait for a given thread to finish.
// cout << "join" << endl;
void * v;
pthread_join(ids[j], &v);

// Get the result of the sum for that thread.
// We don't need to know the array type for this.
ResultHolder<T>* s = static_cast<ResultHolder<T>*>(v);
cout << s->get() << endl;
sum += s->get();

// Delete the data structure passed to the thread.
delete s;
}

// Return the full sum.
return sum;
}

//----------------------------------------------------------------------
// General engine version.
// If we don't know anything about the engine, at least get the right answer.
//----------------------------------------------------------------------

template<int D, class T, class E>
T accumulate(
const ConstArray<D, T, E> & x
){
return accumulateWithLoop(x);
}

#endif

We will not explain this code in detail, but rather will try to give an overview of the main issues it raises and addresses. First, the pthreads library requires programs to pass a void* data pointer when creating a thread, but the thing you pass to the other subroutines is a temporary (in this case, x(*i)). The program must therefore build an object (in this case an ArrayAccumulator) to store the array by value. While this must be built on the heap, not the stack, the Array object is still of course just a handle on the real data.

Second, since the program constructs an object to pass to the thread, it must destroy that object appropriately. In this case pthread_join() returns (via an argument) the pointer that was passed to it; the main accumulate() function picks up this pointer, and deletes the object it points to after casting it appropriately.

There is always the question of how the thread will return information to the rest of the code. In this case, since it is passing the ArrayAccumulator back through pthread_join, the ArrayAccumulator has the result of the sum for that thread.

ArrayAccumulator needs to know the exact type of x(*i) in order to do the accumulation, but it would be bad practice to make the subroutine that loops over the patches only work for one type of array. Instead, the program uses a function called spawn_accumulate(), which is templated on the actual array type.

The program has now handled the problem of generating the threads without knowing the type of x(*i), but it still needs to receive the ArrayAccumulator, and that also has the type. The return data of type T is therefore split into the base class ResultHolder, which only knows the type T. The thing passed back from pthread_join is a pointer to that; since its destructor is virtual, it can safely be deleted.

The result is verbose, but not any more so than most multi-threaded programs. The biggest complication is having to introduce the ArrayAccumulator class in order to put the array being summed over on the heap instead of the stack.

Copyright © Los Alamos National Laboratory 1998-2000