00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <System/cmtkConsole.h>
00033
00034 namespace
00035 cmtk
00036 {
00037
00040
00041 template<class TClass,class TParam>
00042 void
00043 ThreadParameterArray<TClass,TParam>
00044 ::RunInParallelAsynchronous
00045 ( ThreadFunction threadCall )
00046 {
00047 #ifdef _OPENMP
00048 const int nThreadsOMP = std::max<int>( 1, 1+GetNumberOfThreads()-this->m_NumberOfThreads );
00049 omp_set_num_threads( nThreadsOMP );
00050 #endif
00051
00052 #ifdef CMTK_USE_THREADS
00053
00054 #ifndef _MSC_VER
00055 pthread_attr_t attr;
00056 pthread_attr_init(&attr);
00057 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00058 #endif
00059
00060 for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx )
00061 {
00062 this->m_Ptr[threadIdx].ThisThreadIndex = threadIdx;
00063 #ifdef _MSC_VER
00064 this->m_Ptr[threadIdx].m_Handle = CreateThread( NULL , 0 , (LPTHREAD_START_ROUTINE) threadCall,
00065 &this->m_Ptr[threadIdx], 0 , &this->m_Ptr[threadIdx].m_ThreadID );
00066
00067 if (threadHandles[threadIdx] == NULL)
00068 {
00069 fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00070 exit( 1 );
00071 }
00072 #else
00073 int status = pthread_create( &this->m_Ptr[threadIdx].m_ThreadID, &attr, threadCall, &this->m_Ptr[threadIdx] );
00074 if ( status )
00075 {
00076 fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00077 exit( 1 );
00078 }
00079 #endif
00080 }
00081
00082 #ifndef _MSC_VER
00083 pthread_attr_destroy(&attr);
00084 #endif
00085
00086 this->m_AsynchronousThreadsRunning = true;
00087 #else
00088 StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00089 #endif
00090 }
00091
00092
00093 template<class TClass,class TParam>
00094 void
00095 ThreadParameterArray<TClass,TParam>
00096 ::JoinAsynchronousThreads()
00097 {
00098 #ifdef CMTK_USE_THREADS
00099 for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx )
00100 {
00101 #ifdef _MSC_VER
00102 WaitForSingleObject( this->m_Ptr[threadIdx].m_Handle, INFINITE );
00103 #else
00104 void *resultThread;
00105 if ( this->m_Ptr[threadIdx].m_ThreadID )
00106 {
00107 pthread_join( this->m_Ptr[threadIdx].m_ThreadID, &resultThread );
00108 this->m_Ptr[threadIdx].m_ThreadID = 0;
00109 }
00110 #endif
00111 }
00112
00113 #ifdef _OPENMP
00114 omp_set_num_threads( GetNumberOfThreads() );
00115 #endif
00116
00117 this->m_AsynchronousThreadsRunning = false;
00118 #else
00119 StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00120 #endif
00121 }
00122
00123 template<class TClass,class TParam>
00124 void
00125 ThreadParameterArray<TClass,TParam>
00126 ::CancelAsynchronousThreads()
00127 {
00128 #ifdef CMTK_USE_THREADS
00129 for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx )
00130 {
00131 #ifdef _MSC_VER
00132 DWORD resultThread;
00133 TerminateThread( this->m_Ptr[threadIdx].m_Handle, resultThread );
00134 #else
00135 if ( this->m_Ptr[threadIdx].m_ThreadID )
00136 {
00137 pthread_cancel( this->m_Ptr[threadIdx].m_ThreadID );
00138 pthread_join( this->m_Ptr[threadIdx].m_ThreadID, NULL );
00139 this->m_Ptr[threadIdx].m_ThreadID = 0;
00140 }
00141 #endif
00142 }
00143
00144 #ifdef _OPENMP
00145 omp_set_num_threads( GetNumberOfThreads() );
00146 #endif
00147
00148 this->m_AsynchronousThreadsRunning = false;
00149 #else
00150 StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00151 #endif
00152 }
00153
00154 template<class TClass,class TParam>
00155 void
00156 ThreadParameterArray<TClass,TParam>
00157 ::RunInParallelFIFO
00158 ( ThreadFunction threadCall, const size_t numberOfThreadsTotal, const size_t firstThreadIdx )
00159 {
00160 #ifdef _OPENMP
00161 const int nThreadsOMP = std::max<int>( 1, 1+GetNumberOfThreads()-this->m_NumberOfThreads );
00162 omp_set_num_threads( nThreadsOMP );
00163 #endif
00164
00165 #if !defined(CMTK_BUILD_SMP)
00166
00167 for ( size_t threadIdx = 0; threadIdx < numberOfThreadsTotal; ++threadIdx )
00168 {
00169 this->m_Ptr[0].ThisThreadIndex = firstThreadIdx + threadIdx;
00170 threadCall( this->m_Ptr );
00171 }
00172 #else
00173 if ( this->m_NumberOfThreads == 1 )
00174 {
00175 for ( size_t threadIdx = 0; threadIdx < numberOfThreadsTotal; ++threadIdx )
00176 {
00177 this->m_Ptr[0].ThisThreadIndex = firstThreadIdx + threadIdx;
00178 threadCall( this->m_Ptr );
00179 }
00180 }
00181 else
00182 {
00183 #ifdef _MSC_VER
00184 #else
00185 pthread_attr_t attr;
00186 pthread_attr_init(&attr);
00187 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00188 #endif // #ifdef _MSC_VER
00189
00190
00191 size_t threadIdx = 0;
00192 for ( ; (threadIdx < this->m_NumberOfThreads) && (threadIdx < numberOfThreadsTotal); ++threadIdx )
00193 {
00194 this->m_Ptr[threadIdx].ThisThreadIndex = firstThreadIdx + threadIdx;
00195 #ifdef _MSC_VER
00196 this->m_Ptr[threadIdx].m_Handle = CreateThread( NULL , 0 , (LPTHREAD_START_ROUTINE) threadCall,
00197 &this->m_Ptr[threadIdx], 0 , &this->m_Ptr[threadIdx].m_ThreadID );
00198 if ( this->m_Ptr[threadIdx].m_Handle == NULL )
00199 {
00200 fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00201 exit( 1 );
00202 }
00203 #else
00204 int status = pthread_create( &this->m_Ptr[threadIdx].m_ThreadID, &attr, threadCall, &this->m_Ptr[threadIdx] );
00205 if ( status )
00206 {
00207 fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00208 exit( 1 );
00209 }
00210 #endif
00211 }
00212
00213
00214 size_t nextThreadToJoin = 0;
00215 while ( threadIdx < numberOfThreadsTotal )
00216 {
00217 #ifndef _MSC_VER
00218 void *resultThread;
00219 #endif
00220 if ( this->m_Ptr[threadIdx].m_ThreadID )
00221 {
00222 #ifdef _MSC_VER
00223 #else
00224 pthread_join( this->m_Ptr[threadIdx].m_ThreadID, &resultThread );
00225 #endif
00226 }
00227
00228 this->m_Ptr[nextThreadToJoin].ThisThreadIndex = firstThreadIdx + threadIdx;
00229
00230 #ifdef _MSC_VER
00231 this->m_Ptr[nextThreadToJoin].m_Handle = CreateThread( NULL , 0 , (LPTHREAD_START_ROUTINE) threadCall,
00232 &this->m_Ptr[nextThreadToJoin],0 , &this->m_Ptr[nextThreadToJoin].m_ThreadID );
00233 if ( this->m_Ptr[nextThreadToJoin].m_Handle == NULL)
00234 {
00235 fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00236 exit( 1 );
00237 }
00238 #else
00239 int status = pthread_create( &this->m_Ptr[nextThreadToJoin].m_ThreadID, &attr, threadCall, &this->m_Ptr[nextThreadToJoin] );
00240 if ( status )
00241 {
00242 fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00243 exit( 1 );
00244 }
00245 #endif
00246
00247 ++threadIdx;
00248 nextThreadToJoin = (nextThreadToJoin + 1) % this->m_NumberOfThreads;
00249 }
00250
00251
00252 for ( size_t threadIdx = 0; (threadIdx < this->m_NumberOfThreads) && (threadIdx < numberOfThreadsTotal); ++threadIdx )
00253 {
00254 #ifndef _MSC_VER
00255 void *resultThread;
00256 #endif
00257 if ( this->m_Ptr[nextThreadToJoin].m_ThreadID )
00258 {
00259 #ifdef _MSC_VER
00260 #else
00261 pthread_join( this->m_Ptr[nextThreadToJoin].m_ThreadID, &resultThread );
00262 #endif
00263 }
00264
00265 nextThreadToJoin = (nextThreadToJoin + 1) % this->m_NumberOfThreads;
00266 }
00267
00268 #ifdef _MSC_VER
00269 #else
00270 pthread_attr_destroy(&attr);
00271 #endif
00272 }
00273 #endif // #if !defined(CMTK_BUILD_SMP)
00274
00275 #ifdef _OPENMP
00276 omp_set_num_threads( GetNumberOfThreads() );
00277 #endif
00278 }
00279
00280 }