cmtkThreadParameterArray.txx

Go to the documentation of this file.
00001 /*
00002 //
00003 //  Copyright 1997-2009 Torsten Rohlfing
00004 //  Copyright 2004-2009 SRI International
00005 //
00006 //  This file is part of the Computational Morphometry Toolkit.
00007 //
00008 //  http://www.nitrc.org/projects/cmtk/
00009 //
00010 //  The Computational Morphometry Toolkit is free software: you can
00011 //  redistribute it and/or modify it under the terms of the GNU General Public
00012 //  License as published by the Free Software Foundation, either version 3 of
00013 //  the License, or (at your option) any later version.
00014 //
00015 //  The Computational Morphometry Toolkit is distributed in the hope that it
00016 //  will be useful, but WITHOUT ANY WARRANTY; without even the implied
00017 //  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 //  GNU General Public License for more details.
00019 //
00020 //  You should have received a copy of the GNU General Public License along
00021 //  with the Computational Morphometry Toolkit.  If not, see
00022 //  <http://www.gnu.org/licenses/>.
00023 //
00024 //  $Revision: 2398 $
00025 //
00026 //  $LastChangedDate: 2010-10-05 14:54:37 -0700 (Tue, 05 Oct 2010) $
00027 //
00028 //  $LastChangedBy: torstenrohlfing $
00029 //
00030 */
00031 
00032 #include <System/cmtkConsole.h>
00033 
00034 namespace
00035 cmtk
00036 {
00037 
00040 
00041 template<class TClass,class TParam>
00042 void
00043 ThreadParameterArray<TClass,TParam>
00044 ::RunInParallelAsynchronous
00045 ( ThreadFunction threadCall )
00046 {
00047 #ifdef _OPENMP
00048   const int nThreadsOMP = std::max<int>( 1, 1+GetNumberOfThreads()-this->m_NumberOfThreads );
00049   omp_set_num_threads( nThreadsOMP );
00050 #endif
00051 
00052 #ifdef CMTK_USE_THREADS
00053 
00054 #ifndef _MSC_VER
00055   pthread_attr_t attr;
00056   pthread_attr_init(&attr);
00057   pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00058 #endif
00059       
00060   for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx ) 
00061     {
00062     this->m_Ptr[threadIdx].ThisThreadIndex = threadIdx;
00063 #ifdef _MSC_VER
00064     this->m_Ptr[threadIdx].m_Handle = CreateThread( NULL /*default security attributes*/, 0 /*use default stack size*/, (LPTHREAD_START_ROUTINE) threadCall, 
00065                                                     &this->m_Ptr[threadIdx], 0 /*use default creation flags*/, &this->m_Ptr[threadIdx].m_ThreadID );
00066     
00067     if (threadHandles[threadIdx] == NULL) 
00068       {
00069       fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00070       exit( 1 );
00071       }
00072 #else
00073     int status = pthread_create( &this->m_Ptr[threadIdx].m_ThreadID, &attr, threadCall, &this->m_Ptr[threadIdx] );
00074     if ( status ) 
00075       {
00076       fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00077       exit( 1 );
00078       }
00079 #endif    
00080     }
00081   
00082 #ifndef _MSC_VER
00083   pthread_attr_destroy(&attr);
00084 #endif
00085 
00086   this->m_AsynchronousThreadsRunning = true;
00087 #else
00088   StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00089 #endif
00090 }
00091 
00092 
00093 template<class TClass,class TParam>
00094 void
00095 ThreadParameterArray<TClass,TParam>
00096 ::JoinAsynchronousThreads()
00097 {
00098 #ifdef CMTK_USE_THREADS  
00099   for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx ) 
00100     {
00101 #ifdef _MSC_VER
00102     WaitForSingleObject( this->m_Ptr[threadIdx].m_Handle, INFINITE /*no timeout*/ );
00103 #else
00104     void *resultThread;
00105     if ( this->m_Ptr[threadIdx].m_ThreadID ) 
00106       {
00107       pthread_join( this->m_Ptr[threadIdx].m_ThreadID, &resultThread );
00108       this->m_Ptr[threadIdx].m_ThreadID = 0;
00109       }
00110 #endif
00111     }
00112   
00113 #ifdef _OPENMP
00114   omp_set_num_threads( GetNumberOfThreads() );
00115 #endif
00116 
00117   this->m_AsynchronousThreadsRunning = false;
00118 #else
00119   StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00120 #endif
00121 }
00122 
00123 template<class TClass,class TParam>
00124 void
00125 ThreadParameterArray<TClass,TParam>
00126 ::CancelAsynchronousThreads()
00127 {
00128 #ifdef CMTK_USE_THREADS  
00129   for ( size_t threadIdx = 0; threadIdx < this->m_NumberOfThreads; ++threadIdx ) 
00130     {
00131 #ifdef _MSC_VER
00132     DWORD resultThread;
00133     TerminateThread( this->m_Ptr[threadIdx].m_Handle, resultThread );
00134 #else
00135     if ( this->m_Ptr[threadIdx].m_ThreadID ) 
00136       {
00137       pthread_cancel( this->m_Ptr[threadIdx].m_ThreadID );
00138       pthread_join( this->m_Ptr[threadIdx].m_ThreadID, NULL );
00139       this->m_Ptr[threadIdx].m_ThreadID = 0;
00140       }
00141 #endif
00142     }
00143   
00144 #ifdef _OPENMP
00145   omp_set_num_threads( GetNumberOfThreads() );
00146 #endif
00147 
00148   this->m_AsynchronousThreadsRunning = false;
00149 #else
00150   StdErr << "ERROR: cannot run asynchronous threads when built without SMP support; please file a bug report.\n";
00151 #endif
00152 }
00153 
00154 template<class TClass,class TParam>
00155 void
00156 ThreadParameterArray<TClass,TParam>
00157 ::RunInParallelFIFO
00158 ( ThreadFunction threadCall, const size_t numberOfThreadsTotal, const size_t firstThreadIdx )
00159 {
00160 #ifdef _OPENMP
00161   const int nThreadsOMP = std::max<int>( 1, 1+GetNumberOfThreads()-this->m_NumberOfThreads );
00162   omp_set_num_threads( nThreadsOMP );
00163 #endif
00164 
00165 #if !defined(CMTK_BUILD_SMP)
00166   // we're not actually using SMP, so simply run everything "by hand".
00167   for ( size_t threadIdx = 0; threadIdx < numberOfThreadsTotal; ++threadIdx ) 
00168     {
00169     this->m_Ptr[0].ThisThreadIndex = firstThreadIdx + threadIdx;
00170     threadCall( this->m_Ptr );
00171     }
00172 #else
00173   if ( this->m_NumberOfThreads == 1 )
00174     {
00175     for ( size_t threadIdx = 0; threadIdx < numberOfThreadsTotal; ++threadIdx ) 
00176       {
00177       this->m_Ptr[0].ThisThreadIndex = firstThreadIdx + threadIdx;
00178       threadCall( this->m_Ptr );
00179       }
00180     }
00181   else
00182     {
00183 #ifdef _MSC_VER
00184 #else
00185     pthread_attr_t attr;
00186     pthread_attr_init(&attr);
00187     pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00188 #endif // #ifdef _MSC_VER
00189       
00190     /* Initialization phase -- start first batch of parallel threads. */
00191     size_t threadIdx = 0;
00192     for ( ; (threadIdx < this->m_NumberOfThreads) && (threadIdx < numberOfThreadsTotal); ++threadIdx ) 
00193       {
00194       this->m_Ptr[threadIdx].ThisThreadIndex = firstThreadIdx + threadIdx;
00195 #ifdef _MSC_VER
00196       this->m_Ptr[threadIdx].m_Handle = CreateThread( NULL /*default security attributes*/, 0 /*use default stack size*/, (LPTHREAD_START_ROUTINE) threadCall, 
00197                                                       &this->m_Ptr[threadIdx], 0 /*use default creation flags*/, &this->m_Ptr[threadIdx].m_ThreadID );
00198       if ( this->m_Ptr[threadIdx].m_Handle == NULL )
00199         {
00200         fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00201         exit( 1 );
00202         }
00203 #else
00204       int status = pthread_create( &this->m_Ptr[threadIdx].m_ThreadID, &attr, threadCall, &this->m_Ptr[threadIdx] );    
00205       if ( status ) 
00206         {
00207         fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00208         exit( 1 );
00209         }
00210 #endif
00211       }
00212       
00213     /* Sustained phase -- start new thread whenever oldest one completes. */
00214     size_t nextThreadToJoin = 0;
00215     while ( threadIdx < numberOfThreadsTotal )
00216       {
00217 #ifndef _MSC_VER
00218       void *resultThread;
00219 #endif
00220       if ( this->m_Ptr[threadIdx].m_ThreadID ) 
00221         {
00222 #ifdef _MSC_VER
00223 #else
00224         pthread_join( this->m_Ptr[threadIdx].m_ThreadID, &resultThread );
00225 #endif
00226         }
00227         
00228       this->m_Ptr[nextThreadToJoin].ThisThreadIndex = firstThreadIdx + threadIdx;
00229         
00230 #ifdef _MSC_VER
00231       this->m_Ptr[nextThreadToJoin].m_Handle = CreateThread( NULL /*default security attributes*/, 0 /*use default stack size*/, (LPTHREAD_START_ROUTINE) threadCall, 
00232                                                              &this->m_Ptr[nextThreadToJoin],0 /*use default creation flags*/, &this->m_Ptr[nextThreadToJoin].m_ThreadID );
00233       if ( this->m_Ptr[nextThreadToJoin].m_Handle == NULL) 
00234         {
00235         fprintf( stderr, "Creation of thread #%d failed.\n", (int)threadIdx );
00236         exit( 1 );
00237         }
00238 #else
00239       int status = pthread_create( &this->m_Ptr[nextThreadToJoin].m_ThreadID, &attr, threadCall, &this->m_Ptr[nextThreadToJoin] );
00240       if ( status ) 
00241         {
00242         fprintf( stderr, "Creation of thread #%d failed with status %d.\n", (int)threadIdx, (int)status );
00243         exit( 1 );
00244         }
00245 #endif  
00246 
00247       ++threadIdx;
00248       nextThreadToJoin = (nextThreadToJoin + 1) % this->m_NumberOfThreads;
00249       }
00250 
00251     /* Cleanup phase -- Collect remaining thread results. */
00252     for ( size_t threadIdx = 0; (threadIdx < this->m_NumberOfThreads) && (threadIdx < numberOfThreadsTotal); ++threadIdx ) 
00253       {
00254 #ifndef _MSC_VER
00255       void *resultThread;
00256 #endif
00257       if ( this->m_Ptr[nextThreadToJoin].m_ThreadID ) 
00258         {
00259 #ifdef _MSC_VER
00260 #else
00261         pthread_join( this->m_Ptr[nextThreadToJoin].m_ThreadID, &resultThread );
00262 #endif
00263         }
00264         
00265       nextThreadToJoin = (nextThreadToJoin + 1) % this->m_NumberOfThreads;
00266       }
00267       
00268 #ifdef _MSC_VER
00269 #else
00270     pthread_attr_destroy(&attr);
00271 #endif
00272     }
00273 #endif // #if !defined(CMTK_BUILD_SMP)
00274 
00275 #ifdef _OPENMP
00276   omp_set_num_threads( GetNumberOfThreads() );
00277 #endif
00278 }
00279 
00280 } // namespace cmtk
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines