cmtkSplineWarpCongealingFunctionalMPI.txx

Go to the documentation of this file.
00001 /*
00002 //
00003 //  Copyright 1997-2010 Torsten Rohlfing
00004 //
00005 //  Copyright 2004-2011 SRI International
00006 //
00007 //  This file is part of the Computational Morphometry Toolkit.
00008 //
00009 //  http://www.nitrc.org/projects/cmtk/
00010 //
00011 //  The Computational Morphometry Toolkit is free software: you can
00012 //  redistribute it and/or modify it under the terms of the GNU General Public
00013 //  License as published by the Free Software Foundation, either version 3 of
00014 //  the License, or (at your option) any later version.
00015 //
00016 //  The Computational Morphometry Toolkit is distributed in the hope that it
00017 //  will be useful, but WITHOUT ANY WARRANTY; without even the implied
00018 //  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019 //  GNU General Public License for more details.
00020 //
00021 //  You should have received a copy of the GNU General Public License along
00022 //  with the Computational Morphometry Toolkit.  If not, see
00023 //  <http://www.gnu.org/licenses/>.
00024 //
00025 //  $Revision: 2731 $
00026 //
00027 //  $LastChangedDate: 2011-01-13 16:22:47 -0800 (Thu, 13 Jan 2011) $
00028 //
00029 //  $LastChangedBy: torstenrohlfing $
00030 //
00031 */
00032 
00033 #ifdef CMTK_BUILD_MPI
00034 
00035 namespace
00036 cmtk
00037 {
00038 
00041 
00042 SplineWarpCongealingFunctional::ReturnType
00043 SplineWarpCongealingFunctional
00044 ::EvaluateWithGradient
00045 ( CoordinateVector& v, CoordinateVector& g, const Types::Coordinate step )
00046 {
00047   const Self::ReturnType baseValue = this->EvaluateAt( v );
00048 
00049   const size_t numberOfXforms = this->m_XformVector.size();
00050   const size_t numberOfControlPoints = this->m_ParametersPerXform / 3;
00051   // use more chunks than nodes to get more fine-grained parallelism
00052   const size_t controlPointsPerNode = 1 + static_cast<size_t>( numberOfControlPoints / (4 * this->m_SizeMPI) );
00053   // assign less work per chunk to root node to leave time to check for incoming results and
00054   // keep worker nodes maximally busy
00055   const size_t controlPointsRootNode = 1 + controlPointsPerNode / 4;
00056 
00057   MPI::Status status;
00058 
00059   size_t resultsToReceive = 0;
00060   std::vector<size_t> taskByNode( this->m_SizeMPI, -1 );
00061 
00062   const size_t msgBufferSize = 3 * sizeof( Types::Coordinate ) * controlPointsPerNode * numberOfXforms;
00063   std::vector<char> msgBuffer( msgBufferSize );
00064   char* msgBufferPtr = &msgBuffer[0];
00065 
00066   // Allocate thread storage
00067   const size_t numberOfThreads = Threads::GetNumberOfThreads();
00068   if ( this->m_StaticThreadStorage.size() != numberOfThreads )
00069     {
00070     this->m_StaticThreadStorage.resize( numberOfThreads );
00071     for ( size_t thread = 0; thread < numberOfThreads; ++thread )
00072       {
00073       this->m_StaticThreadStorage[thread].Initialize( this );
00074       }
00075     }
00076   else
00077     {
00078     for ( size_t thread = 0; thread < numberOfThreads; ++thread )
00079       {
00080       this->m_StaticThreadStorage[thread].m_NeedToCopyXformParameters = true;
00081       }
00082     }
00083 
00084   // Create thread parameter array and start threads asynchronously
00085   ThreadParameterArray<Self,EvaluateLocalGradientThreadParameters> threadParams( this, numberOfThreads );
00086   threadParams.RunInParallelAsynchronous( EvaluateLocalGradientThreadFunc );
00087   for ( size_t thread = 0; thread < numberOfThreads; ++thread )
00088     {
00089     threadParams[thread].m_Step = step;
00090     threadParams[thread].m_Gradient = (Types::Coordinate*)msgBufferPtr;
00091     }
00092   
00093   bool allJobsSent = false;
00094   size_t firstIndexToCompute = 0;
00095   while ( (firstIndexToCompute < numberOfControlPoints) && ! allJobsSent )
00096     {
00097     if ( this->m_RankMPI == 0 )
00098       {      
00099       // send out job instructions or termination signal
00100       for ( int dest = 1; (dest < this->m_SizeMPI) && !allJobsSent; ++dest )
00101         {
00102         if ( taskByNode[dest] == (size_t)-1 )
00103           {
00104           if ( firstIndexToCompute >= numberOfControlPoints )
00105             {
00106             allJobsSent = true;
00107             }
00108           else
00109             {
00110             taskByNode[dest] = firstIndexToCompute;
00111             MPI::COMM_WORLD.Ssend( &firstIndexToCompute, sizeof( firstIndexToCompute ), MPI::CHAR, dest, MESSAGE_TAG_COMPUTE );
00112             ++resultsToReceive;
00113 
00114             firstIndexToCompute += controlPointsPerNode;
00115             }
00116           }
00117         }
00118       }
00119     else
00120       {
00121       // receive job instructions      
00122       MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, 0 /*root*/ , MPI::ANY_TAG, status );
00123       if ( status.Get_tag() == MESSAGE_TAG_FINISHED )
00124         {
00125         allJobsSent = true;
00126         }
00127       else
00128         {
00129         int position = 0;
00130         MPI::CHAR.Unpack( msgBufferPtr, msgBufferSize, &firstIndexToCompute, sizeof( firstIndexToCompute ), position, MPI::COMM_WORLD );
00131         }
00132       }
00133     
00134     if ( (firstIndexToCompute < numberOfControlPoints ) && ! allJobsSent )
00135       {
00136       const size_t cpsToCompute = std::min( this->m_RankMPI ? controlPointsPerNode : controlPointsRootNode, (int)numberOfControlPoints - firstIndexToCompute );
00137       // determine last control point processed
00138       this->m_ControlPointIndexNext = firstIndexToCompute;
00139       this->m_ControlPointIndexLast = firstIndexToCompute + cpsToCompute;
00140 
00141       for ( size_t thread = 0; thread < numberOfThreads; ++thread )
00142         {
00143         threadParams[thread].m_FirstIndexToCompute = firstIndexToCompute;
00144         // Post "work ready" semaphore for each thread
00145         this->m_ThreadWorkSemaphore.Post();
00146         }
00147 
00148       // Wait for "work done" semaphore from each thread
00149       for ( size_t nThreads = 0; nThreads < numberOfThreads; ++nThreads )
00150         {
00151         this->m_ThreadReadySemaphore.Wait();
00152         }
00153       
00154       // on root node, simply sort results into gradient fields
00155       if ( this->m_RankMPI == 0 )
00156         {
00157         // sort computed values into gradient vector
00158         this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, firstIndexToCompute, this->m_ControlPointIndexLast );
00159         firstIndexToCompute += cpsToCompute;
00160         }
00161       else
00162         {
00163         // send results
00164         MPI::COMM_WORLD.Ssend( msgBufferPtr, 3 * numberOfXforms * cpsToCompute * sizeof( Types::Coordinate ), MPI::CHAR, 0, MESSAGE_TAG_RESULTS );
00165         }
00166       }
00167     
00168     // are we still waiting for results from worker nodes?
00169     // are the worker nodes still trying to send data?
00170     if ( this->m_RankMPI == 0 )
00171       {
00172       // receive results:
00173       // is a result ready to be received?
00174       while ( resultsToReceive && MPI::COMM_WORLD.Iprobe( MPI::ANY_SOURCE, MESSAGE_TAG_RESULTS, status ) )
00175         {
00176         // receive result
00177         MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, status.Get_source(), MESSAGE_TAG_RESULTS, status );
00178                 
00179         // what control point did we send this node for processing?
00180         const int fromNode = status.Get_source();
00181         const int receivedJob = taskByNode[fromNode];
00182         
00183         --resultsToReceive;
00184         taskByNode[fromNode] = -1;
00185         
00186         // determine last control point processed
00187         const size_t maxCpIndex = std::min<size_t>( receivedJob + controlPointsPerNode, numberOfControlPoints );
00188         this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, receivedJob, maxCpIndex );
00189         }
00190       }
00191     }
00192 
00193   // collect remaining results from worker nodes
00194   while ( resultsToReceive )
00195     {
00196     // receive result
00197     MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, MPI::ANY_SOURCE, MPI::ANY_TAG, status );
00198     
00199     // what control point did we send this node for processing?
00200     const int fromNode = status.Get_source();
00201     const int receivedJob = taskByNode[fromNode];
00202 
00203     --resultsToReceive;
00204     taskByNode[fromNode] = -1;
00205     
00206     // determine last control point processed
00207     const size_t maxCpIndex = std::min<size_t>( receivedJob + controlPointsPerNode, numberOfControlPoints );
00208     this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, receivedJob, maxCpIndex );
00209     }  
00210  
00211   if ( this->m_RankMPI == 0 )
00212     { 
00213     // tell all worker nodes to finish their computation loops
00214     for ( int workerNode = 1; workerNode < this->m_SizeMPI; ++workerNode )
00215       {
00216       MPI::COMM_WORLD.Ssend( msgBufferPtr, 1, MPI::CHAR, workerNode, MESSAGE_TAG_FINISHED );
00217       }
00218     }
00219   
00220   // Distribute gradient estimate
00221   MPI::COMM_WORLD.Bcast( g.Elements, g.Dim * sizeof( g.Elements[0] ), MPI::CHAR, 0 /*root*/ );
00222   
00223   if ( this->m_PartialGradientMode )
00224     {
00225     const Types::Coordinate gthresh = g.MaxNorm() * this->m_PartialGradientThreshold;
00226     for ( size_t param = 0; param < g.Dim; ++param )
00227       {
00228       if ( fabs( g[param] ) < gthresh )
00229         {       
00230         g[param] = this->m_ParamStepArray[param] = 0.0;
00231         }
00232       }
00233     }
00234 
00235   if ( this->m_ForceZeroSum )
00236     {
00237     this->ForceZeroSumGradient( g );
00238     }
00239 
00240   return baseValue;
00241 }
00242 
00243 void
00244 SplineWarpCongealingFunctional
00245 ::ReorderGradientComponents
00246 ( Types::Coordinate *const dst, const Types::Coordinate* src, const size_t fromCpIdx, const size_t toCpIdx )
00247 {
00248   const size_t numberOfParameters = this->ParamVectorDim();
00249   
00250   // sort computed values into gradient vector
00251   size_t currentParameter = 0;
00252   for ( size_t cpIndex = fromCpIdx; cpIndex < toCpIdx; ++cpIndex )
00253     {
00254     for ( size_t cparam = 3*cpIndex; cparam < numberOfParameters; cparam += this->m_ParametersPerXform )
00255       {
00256       for ( size_t dim = 0; dim < 3; ++dim, ++currentParameter )
00257         {
00258         dst[cparam+dim] = src[currentParameter];
00259         }
00260       }
00261     }
00262 }
00263 
00264 CMTK_THREAD_RETURN_TYPE
00265 SplineWarpCongealingFunctional
00266 ::EvaluateLocalGradientThreadFunc
00267 ( void* args )
00268 {
00269   EvaluateLocalGradientThreadParameters* threadParameters = static_cast<EvaluateLocalGradientThreadParameters*>( args );
00270   
00271   Self* This = threadParameters->thisObject;
00272   const Self* ThisConst = This;
00273   const size_t threadID = threadParameters->ThisThreadIndex;
00274 
00275   const size_t numberOfXforms = ThisConst->m_XformVector.size();
00276   const size_t parametersPerXform = ThisConst->m_ParametersPerXform;
00277   const size_t paramVectorDim = ThisConst->ParamVectorDim();
00278   
00279   const byte paddingValue = ThisConst->m_PaddingValue;
00280   const size_t imagesFrom = ThisConst->m_ActiveImagesFrom;
00281   const size_t imagesTo = ThisConst->m_ActiveImagesTo;
00282   const size_t numberOfImages = imagesTo - imagesFrom;
00283   const size_t numberOfImagesIncludingTemplate = ThisConst->m_UseTemplateData ? numberOfImages+1 : numberOfImages;
00284 
00285   Self::StaticThreadStorage* threadStorage = &(This->m_StaticThreadStorage[threadID]);
00286   if ( threadStorage->m_NeedToCopyXformParameters )
00287     {
00288     for ( size_t xi = 0; xi < numberOfXforms; ++xi )
00289       {
00290       threadStorage->m_Xforms[xi]->CopyParamVector( This->GetXformByIndex(xi) );
00291       threadStorage->m_NeedToCopyXformParameters = false;
00292       }
00293     }
00294 
00295   const UniformVolume* templateGrid = ThisConst->m_TemplateGrid;
00296   const size_t numberOfControlPoints = ThisConst->m_ParametersPerXform / 3;
00297   
00298   size_t cpIndex;
00299   
00300   while ( true )
00301     {
00302     This->m_ThreadWorkSemaphore.Wait();
00303     
00304     This->m_ControlPointIndexLock.Lock();
00305     while ( (cpIndex = This->m_ControlPointIndexNext) < This->m_ControlPointIndexLast )
00306       {
00307       ++This->m_ControlPointIndexNext;
00308       This->m_ControlPointIndexLock.Unlock();
00309       
00310       if ( !(cpIndex % 1000) )
00311         {
00312         std::cerr << cpIndex << " / " << numberOfControlPoints << "\r";
00313         }    
00314       
00315       std::vector<DataGrid::RegionType>::const_iterator voi = ThisConst->m_VolumeOfInfluenceArray.begin() + cpIndex;
00316       const size_t pixelsPerLineVOI = (voi->To()[0]-voi->From()[0]);
00317       
00318       std::fill( threadStorage->m_FPlus.begin(), threadStorage->m_FPlus.end(), 0 );
00319       std::fill( threadStorage->m_FMinus.begin(), threadStorage->m_FMinus.end(), 0 );
00320       std::fill( threadStorage->m_CountByParameterPlus.begin(), threadStorage->m_CountByParameterPlus.end(), 0 );
00321       std::fill( threadStorage->m_CountByParameterMinus.begin(), threadStorage->m_CountByParameterMinus.end(), 0 );
00322       
00323       for ( int z = voi->From()[2]; (z < voi->To()[2]); ++z ) 
00324         {
00325         for ( int y = voi->From()[1]; (y < voi->To()[1]); ++y )
00326           {
00327           // evaluate baseline entropies for this row
00328           const size_t rowofs = templateGrid->GetOffsetFromIndex( voi->From()[0], y, z );
00329           
00330           size_t ofs = rowofs;
00331           for ( size_t x = 0; x < pixelsPerLineVOI; ++x, ++ofs )
00332             {
00333             threadStorage->m_Histogram[x].Reset();
00334             threadStorage->m_Count[x] = 0;
00335           
00336             const size_t kernelIdx = ThisConst->m_StandardDeviationByPixel[ofs];
00337             const size_t kernelRadius = ThisConst->m_HistogramKernelRadius[kernelIdx];
00338             const HistogramBinType* kernel = ThisConst->m_HistogramKernel[kernelIdx];
00339           
00340             if ( ThisConst->m_UseTemplateData )
00341               {
00342               const byte templateValue = ThisConst->m_TemplateData[ofs];
00343               if ( templateValue != paddingValue )
00344                 {
00345                 threadStorage->m_Histogram[x].AddWeightedSymmetricKernel( templateValue, kernelRadius, kernel );
00346                 ++threadStorage->m_Count[x];
00347                 }
00348               }
00349             
00350             for ( size_t img = imagesFrom; img < imagesTo; ++img )
00351               { 
00352               const byte dataThisPixel = ThisConst->m_Data[img][ofs];
00353               if ( dataThisPixel != paddingValue )
00354                 {
00355                 threadStorage->m_Histogram[x].AddWeightedSymmetricKernel( dataThisPixel, kernelRadius, kernel );
00356                 ++threadStorage->m_Count[x];
00357                 }
00358               }
00359             }
00360         
00361           size_t cparam = 3 * cpIndex;
00362           size_t currentParameter = 0;
00363           for ( size_t imageIdx = 0; imageIdx < numberOfXforms; ++imageIdx, cparam += parametersPerXform )
00364             {
00365             SplineWarpXform::SmartPtr xform = threadStorage->m_Xforms[imageIdx];
00366             const UniformVolume* target = ThisConst->m_ImageVector[imageIdx];
00367             const byte* dataPtr = static_cast<const byte*>( target->GetData()->GetDataPtr() );
00368           
00369             for ( size_t dim = 0; dim < 3; ++dim, ++currentParameter )
00370               {
00371               const size_t cdparam = cparam + dim;
00372               const size_t xfparam = 3 * cpIndex + dim;
00373               const Types::Coordinate pStep = ThisConst->m_ParamStepArray[cdparam] * threadParameters->m_Step;
00374             
00375               if ( pStep > 0 )
00376                 {
00377                 const Types::Coordinate v0 = xform->GetParameter( xfparam );
00378                 for ( int delta = 0; delta < 2; ++delta )
00379                   {
00380                   Types::Coordinate vTest = v0 + (2*delta-1) * pStep;
00381                   xform->SetParameter( xfparam, vTest );
00382                 
00383                   xform->GetTransformedGridRow( pixelsPerLineVOI, &(threadStorage->m_VectorList[0]), voi->From()[0], y, z );
00384                 
00385                   size_t ofs = rowofs;
00386                   for ( size_t x = 0; x < pixelsPerLineVOI; ++x, ++ofs )
00387                     {
00388                     if ( threadStorage->m_Count[x] == numberOfImages ) // full count?
00389                       {
00390                       const byte baselineData = ThisConst->m_Data[imageIdx][ofs];
00391                       HistogramType& workingHistogram = threadStorage->m_Histogram[x];
00392                     
00393                       const size_t kernelIdx = ThisConst->m_StandardDeviationByPixel[ofs];
00394                       const size_t kernelRadius = ThisConst->m_HistogramKernelRadius[kernelIdx];
00395                       const HistogramBinType* kernel = ThisConst->m_HistogramKernel[kernelIdx];
00396                     
00397                       byte value;
00398                       if ( (baselineData != paddingValue) && target->ProbeData( value, dataPtr, threadStorage->m_VectorList[x] ) )
00399                         {
00400                         if ( value != paddingValue )
00401                           {
00402                           if ( delta )
00403                             ++threadStorage->m_CountByParameterPlus[currentParameter];
00404                           else
00405                             ++threadStorage->m_CountByParameterMinus[currentParameter];
00406                         
00407                           if ( value != baselineData )
00408                             {
00409                             float fd = 0;
00410                             float invsamples = 1.0 / workingHistogram.SampleCount();
00411                           
00412                             const size_t idxMin = std::min( value, baselineData ) - kernelRadius;
00413                             const size_t idxMax = std::max( value, baselineData ) + kernelRadius + 1;
00414                           
00415                             for ( size_t idx = idxMin; idx < idxMax; ++idx )
00416                               {
00417                               const size_t idxV = abs(idx - value);
00418                               const float kernelIn = ( idxV < kernelRadius ) ? kernel[idxV] : 0;
00419                             
00420                               const size_t idxB = abs(idx - baselineData);
00421                               const float kernelOut = ( idxB < kernelRadius ) ? kernel[idxB] : 0;
00422                             
00423                               if ( (kernelIn > 0) || (kernelOut > 0) )
00424                                 {
00425                                 fd += MathUtil::plogp( invsamples * (workingHistogram[idx] - kernelOut + kernelIn) ) - MathUtil::plogp( invsamples * workingHistogram[idx] );
00426                                 }
00427                               }
00428                           
00429                             if ( delta ) 
00430                               threadStorage->m_FPlus[currentParameter] += fd;
00431                             else
00432                               threadStorage->m_FMinus[currentParameter] += fd;
00433                             }
00434                           }
00435                         }
00436                       }
00437                     }
00438                   }
00439                 xform->SetParameter( xfparam, v0 );
00440                 }
00441               }
00442             }
00443           }
00444         }
00445    
00446       // approximate gradient from upper and lower function evaluations  
00447       size_t currentParameterIdxLinear = 3 * numberOfXforms * (cpIndex - threadParameters->m_FirstIndexToCompute );
00448       size_t imageDimIdx = 0;
00449       for ( size_t cparam = 3*cpIndex; cparam < paramVectorDim; cparam += parametersPerXform )
00450         {
00451         for ( size_t dim = 0; dim < 3; ++dim, ++imageDimIdx, ++currentParameterIdxLinear )
00452           {
00453           threadParameters->m_Gradient[currentParameterIdxLinear] = 0.0;
00454           assert( imageDimIdx < 3 * numberOfXforms );
00455           if ( threadStorage->m_CountByParameterPlus[imageDimIdx] && threadStorage->m_CountByParameterMinus[imageDimIdx] )
00456             {
00457             double upper = threadStorage->m_FPlus[imageDimIdx] / threadStorage->m_CountByParameterPlus[imageDimIdx];
00458             double lower = threadStorage->m_FMinus[imageDimIdx] / threadStorage->m_CountByParameterMinus[imageDimIdx];
00459           
00460             if ( (ThisConst->m_JacobianConstraintWeight > 0) || (ThisConst->m_BendingEnergyWeight > 0) )
00461               {
00462               const size_t cdparam = cparam+dim;
00463               const size_t warpParam = cdparam % parametersPerXform;
00464               const size_t imageIdx = cdparam / parametersPerXform;
00465               const SplineWarpXform* xform = ThisConst->GetXformByIndex(imageIdx);
00466               const Types::Coordinate pStep = ThisConst->m_ParamStepArray[cdparam] * threadParameters->m_Step;
00467             
00468               if ( ThisConst->m_JacobianConstraintWeight > 0 )
00469                 {
00470                 double upperJ, lowerJ;
00471                 xform->GetJacobianConstraintDerivative( upperJ, lowerJ, warpParam, voi[warpParam], pStep );
00472               
00473                 upper -= ThisConst->m_JacobianConstraintWeight * upperJ;
00474                 lower -= ThisConst->m_JacobianConstraintWeight * lowerJ;
00475                 }
00476             
00477               if ( ThisConst->m_BendingEnergyWeight > 0 )
00478                 {
00479                 double upperE, lowerE;
00480                 xform->GetGridEnergyDerivative( upperE, lowerE, warpParam, pStep );           
00481                 upper -= ThisConst->m_BendingEnergyWeight * upperE;
00482                 lower -= ThisConst->m_BendingEnergyWeight * lowerE;
00483                 }
00484               }
00485 
00486             if ( (upper > 0) || (lower > 0))
00487               {
00488               threadParameters->m_Gradient[currentParameterIdxLinear] = upper - lower;
00489               }
00490             }
00491           }
00492         }
00493 
00494       This->m_ControlPointIndexLock.Lock();
00495       }
00496     This->m_ControlPointIndexLock.Unlock();
00497 
00498     This->m_ThreadReadySemaphore.Post();
00499     }
00500   
00501   return CMTK_THREAD_RETURN_VALUE;
00502 }
00503 
00504 } // namespace cmtk
00505 
00506 #endif // #ifdef CMTK_BUILD_MPI
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines