cmtkThreadPool.cxx

Go to the documentation of this file.
00001 /*
00002 //
00003 //  Copyright 1997-2009 Torsten Rohlfing
00004 //
00005 //  Copyright 2004-2010 SRI International
00006 //
00007 //  This file is part of the Computational Morphometry Toolkit.
00008 //
00009 //  http://www.nitrc.org/projects/cmtk/
00010 //
00011 //  The Computational Morphometry Toolkit is free software: you can
00012 //  redistribute it and/or modify it under the terms of the GNU General Public
00013 //  License as published by the Free Software Foundation, either version 3 of
00014 //  the License, or (at your option) any later version.
00015 //
00016 //  The Computational Morphometry Toolkit is distributed in the hope that it
00017 //  will be useful, but WITHOUT ANY WARRANTY; without even the implied
00018 //  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019 //  GNU General Public License for more details.
00020 //
00021 //  You should have received a copy of the GNU General Public License along
00022 //  with the Computational Morphometry Toolkit.  If not, see
00023 //  <http://www.gnu.org/licenses/>.
00024 //
00025 //  $Revision: 2398 $
00026 //
00027 //  $LastChangedDate: 2010-10-05 14:54:37 -0700 (Tue, 05 Oct 2010) $
00028 //
00029 //  $LastChangedBy: torstenrohlfing $
00030 //
00031 */
00032 
00033 #include "cmtkThreadPool.h"
00034 
00035 #include <System/cmtkThreads.h>
00036 #include <System/cmtkMutexLock.h>
00037 #include <System/cmtkConsole.h>
00038 
00039 namespace
00040 cmtk
00041 {
00042 
00043 ThreadPool::ThreadPool( const size_t nThreads )
00044   : m_NumberOfTasks( 0 ),
00045     m_NextTaskIndex( 0 ),
00046     m_TaskFunction( NULL ),
00047     m_ThreadsRunning( false ),
00048     m_ContinueThreads( true )
00049 {
00050   if ( ! nThreads )
00051     this->m_NumberOfThreads = cmtk::Threads::GetNumberOfThreads();
00052   else
00053     this->m_NumberOfThreads = nThreads;
00054 
00055 #ifdef CMTK_BUILD_SMP  
00056   this->m_ThreadID.resize( this->m_NumberOfThreads, 0 );
00057 #ifdef _MSC_VER
00058   this->m_ThreadHandles.resize( this->m_NumberOfThreads, 0 );
00059 #endif
00060 #endif
00061   this->m_ThreadArgs.resize( this->m_NumberOfThreads );
00062 }
00063 
00064 void
00065 ThreadPool::StartThreads()
00066 {
00067   if ( !this->m_ThreadsRunning )
00068     {
00069 #ifdef CMTK_BUILD_SMP  
00070 #ifdef CMTK_USE_THREADS
00071     pthread_attr_t attr;
00072     pthread_attr_init(&attr);
00073     pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
00074     
00075     for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx ) 
00076       {
00077       // set thread function arguments
00078       this->m_ThreadArgs[idx].m_Pool = this;
00079       this->m_ThreadArgs[idx].m_Index = idx;
00080 
00081       // start thread
00082       const int status = pthread_create( &this->m_ThreadID[idx], &attr, cmtkThreadPoolThreadFunction, static_cast<CMTK_THREAD_ARG_TYPE>( &this->m_ThreadArgs[idx] ) );
00083       
00084       if ( status ) 
00085         {
00086         StdErr.printf( "Creation of pooled thread #%d failed with status %d.\n", idx, status );
00087         exit( 1 );
00088         }
00089       }
00090     
00091     pthread_attr_destroy(&attr);
00092 #elif defined(_MSC_VER)
00093     for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx ) 
00094       {
00095       // set thread function arguments
00096       this->m_ThreadArgs[idx].m_Pool = this;
00097       this->m_ThreadArgs[idx].m_Index = idx;
00098 
00099       // nothing happened yet, so set status to OK
00100       int status = 0;
00101 
00102       // start thread
00103       this->m_ThreadHandles[idx] = CreateThread( NULL /*default security attributes*/, 0/*use default stack size*/, (LPTHREAD_START_ROUTINE) cmtkThreadPoolThreadFunction, 
00104                                                  static_cast<CMTK_THREAD_ARG_TYPE>(  &this->m_ThreadArgs[idx] ),  0/*use default creation flags*/, &this->m_ThreadID[idx] );
00105       if ( this->m_ThreadHandles[idx] == NULL ) 
00106         {
00107         status = -1;
00108         }
00109       
00110       if ( status ) 
00111         {
00112         StdErr.printf( "Creation of pooled thread #%d failed with status %d.\n", idx, status );
00113         exit( 1 );
00114         }
00115       }
00116 #endif // #ifdef CMTK_USE_THREADS
00117 #endif // #ifdef CMTK_BUILD_SMP
00118     this->m_ThreadsRunning = true;
00119     }
00120 }
00121 
00122 ThreadPool::~ThreadPool()
00123 {
00124   this->EndThreads();
00125 }
00126 
00127 
00128 void
00129 ThreadPool::EndThreads()
00130 {
00131   if ( this->m_ThreadsRunning )
00132     {
00133 #ifdef CMTK_USE_THREADS
00134     // set flag to terminate threads and post one semaphore per actual thread
00135     this->m_ContinueThreads = false;
00136     this->m_TaskWaitingSemaphore.Post( this->m_NumberOfThreads );
00137     for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx ) 
00138       {
00139       if ( this->m_ThreadID[idx] ) 
00140         {
00141         pthread_join( this->m_ThreadID[idx], NULL );
00142         this->m_ThreadID[idx] = 0;
00143         }
00144       }
00145 #elif defined(_MSC_VER)
00146     for ( size_t idx = 0; idx < this->m_NumberOfThreads; ++idx ) 
00147       {
00148       DWORD resultThread = 0;
00149       TerminateThread( this->m_ThreadHandles[idx], resultThread );
00150       }
00151 #endif
00152     this->m_ThreadsRunning = false;
00153     }
00154 }
00155 
00156 void
00157 ThreadPool::ThreadFunction( const size_t threadIdx )
00158 {
00159 #ifdef _OPENMP
00160   // Disable OpenMP inside thread
00161   omp_set_num_threads( 1 );
00162 #endif
00163 
00164 #ifdef CMTK_BUILD_SMP
00165   // wait for task waiting
00166   this->m_TaskWaitingSemaphore.Wait();
00167   while ( this->m_ContinueThreads )
00168     {
00169     // lock, get, increment next task index
00170     this->m_NextTaskIndexLock.Lock();
00171     const size_t taskIdx = this->m_NextTaskIndex;
00172     ++this->m_NextTaskIndex;
00173     this->m_NextTaskIndexLock.Unlock();
00174     
00175     // call task function
00176     this->m_TaskFunction( this->m_TaskParameters[taskIdx], taskIdx, this->m_NumberOfTasks, threadIdx, this->m_NumberOfThreads ); 
00177     
00178     // post "task done, thread waiting"
00179     this->m_ThreadWaitingSemaphore.Post();
00180 
00181     // wait for task waiting
00182     this->m_TaskWaitingSemaphore.Wait();
00183     }
00184 #endif // #ifdef CMTK_BUILD_SMP
00185 }
00186 
00187 ThreadPool& 
00188 ThreadPool::GetGlobalThreadPool()
00189 {
00190   static ThreadPool globalThreadPool;
00191   return globalThreadPool;
00192 }
00193 
00194 }
00195 
00196 CMTK_THREAD_RETURN_TYPE
00197 cmtkThreadPoolThreadFunction( CMTK_THREAD_ARG_TYPE arg )
00198 {
00199   static_cast<cmtk::ThreadPool::ThreadPoolArg*>( arg )->m_Pool->ThreadFunction( static_cast<cmtk::ThreadPool::ThreadPoolArg*>( arg )->m_Index );
00200   return CMTK_THREAD_RETURN_VALUE;
00201 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines