Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "cmtkThreadPool.h"
00034
00035 #include <System/cmtkThreads.h>
00036 #include <System/cmtkMutexLock.h>
00037 #include <System/cmtkConsole.h>
00038
00039 namespace
00040 cmtk
00041 {
00042
00043 ThreadPool::ThreadPool( const size_t nThreads )
00044 : m_NumberOfTasks( 0 ),
00045 m_NextTaskIndex( 0 ),
00046 m_TaskFunction( NULL ),
00047 m_ThreadsRunning( false ),
00048 m_ContinueThreads( true )
00049 {
00050 if ( ! nThreads )
00051 this->m_NumberOfThreads = cmtk::Threads::GetNumberOfThreads();
00052 else
00053 this->m_NumberOfThreads = nThreads;
00054
00055 #ifdef CMTK_BUILD_SMP
00056 this->m_ThreadID.resize( this->m_NumberOfThreads, 0 );
00057 #ifdef _MSC_VER
00058 this->m_ThreadHandles.resize( this->m_NumberOfThreads, 0 );
00059 #endif
00060 #endif
00061 this->m_ThreadArgs.resize( this->m_NumberOfThreads );
00062 }
00063
00064 void
00065 ThreadPool::StartThreads()
00066 {
00067 if ( !this->m_ThreadsRunning )
00068 {
00069 #ifdef CMTK_BUILD_SMP
00070 #ifdef CMTK_USE_THREADS
00071 pthread_attr_t attr;
00072 pthread_attr_init(&attr);
00073 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00074
00075 for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx )
00076 {
00077
00078 this->m_ThreadArgs[idx].m_Pool = this;
00079 this->m_ThreadArgs[idx].m_Index = idx;
00080
00081
00082 const int status = pthread_create( &this->m_ThreadID[idx], &attr, cmtkThreadPoolThreadFunction, static_cast<CMTK_THREAD_ARG_TYPE>( &this->m_ThreadArgs[idx] ) );
00083
00084 if ( status )
00085 {
00086 StdErr.printf( "Creation of pooled thread #%d failed with status %d.\n", idx, status );
00087 exit( 1 );
00088 }
00089 }
00090
00091 pthread_attr_destroy(&attr);
00092 #elif defined(_MSC_VER)
00093 for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx )
00094 {
00095
00096 this->m_ThreadArgs[idx].m_Pool = this;
00097 this->m_ThreadArgs[idx].m_Index = idx;
00098
00099
00100 int status = 0;
00101
00102
00103 this->m_ThreadHandles[idx] = CreateThread( NULL , 0, (LPTHREAD_START_ROUTINE) cmtkThreadPoolThreadFunction,
00104 static_cast<CMTK_THREAD_ARG_TYPE>( &this->m_ThreadArgs[idx] ), 0, &this->m_ThreadID[idx] );
00105 if ( this->m_ThreadHandles[idx] == NULL )
00106 {
00107 status = -1;
00108 }
00109
00110 if ( status )
00111 {
00112 StdErr.printf( "Creation of pooled thread #%d failed with status %d.\n", idx, status );
00113 exit( 1 );
00114 }
00115 }
00116 #endif // #ifdef CMTK_USE_THREADS
00117 #endif // #ifdef CMTK_BUILD_SMP
00118 this->m_ThreadsRunning = true;
00119 }
00120 }
00121
00122 ThreadPool::~ThreadPool()
00123 {
00124 this->EndThreads();
00125 }
00126
00127
00128 void
00129 ThreadPool::EndThreads()
00130 {
00131 if ( this->m_ThreadsRunning )
00132 {
00133 #ifdef CMTK_USE_THREADS
00134
00135 this->m_ContinueThreads = false;
00136 this->m_TaskWaitingSemaphore.Post( this->m_NumberOfThreads );
00137 for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx )
00138 {
00139 if ( this->m_ThreadID[idx] )
00140 {
00141 pthread_join( this->m_ThreadID[idx], NULL );
00142 this->m_ThreadID[idx] = 0;
00143 }
00144 }
00145 #elif defined(_MSC_VER)
00146 for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx )
00147 {
00148 DWORD resultThread = 0;
00149 TerminateThread( this->m_ThreadHandles[idx], resultThread );
00150 }
00151 #endif
00152 this->m_ThreadsRunning = false;
00153 }
00154 }
00155
00156 void
00157 ThreadPool::ThreadFunction( const size_t threadIdx )
00158 {
00159 #ifdef _OPENMP
00160
00161 omp_set_num_threads( 1 );
00162 #endif
00163
00164 #ifdef CMTK_BUILD_SMP
00165
00166 this->m_TaskWaitingSemaphore.Wait();
00167 while ( this->m_ContinueThreads )
00168 {
00169
00170 this->m_NextTaskIndexLock.Lock();
00171 const size_t taskIdx = this->m_NextTaskIndex;
00172 ++this->m_NextTaskIndex;
00173 this->m_NextTaskIndexLock.Unlock();
00174
00175
00176 this->m_TaskFunction( this->m_TaskParameters[taskIdx], taskIdx, this->m_NumberOfTasks, threadIdx, this->m_NumberOfThreads );
00177
00178
00179 this->m_ThreadWaitingSemaphore.Post();
00180
00181
00182 this->m_TaskWaitingSemaphore.Wait();
00183 }
00184 #endif // #ifdef CMTK_BUILD_SMP
00185 }
00186
00187 ThreadPool&
00188 ThreadPool::GetGlobalThreadPool()
00189 {
00190 static ThreadPool globalThreadPool;
00191 return globalThreadPool;
00192 }
00193
00194 }
00195
00196 CMTK_THREAD_RETURN_TYPE
00197 cmtkThreadPoolThreadFunction( CMTK_THREAD_ARG_TYPE arg )
00198 {
00199 static_cast<cmtk::ThreadPool::ThreadPoolArg*>( arg )->m_Pool->ThreadFunction( static_cast<cmtk::ThreadPool::ThreadPoolArg*>( arg )->m_Index );
00200 return CMTK_THREAD_RETURN_VALUE;
00201 }