00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <mpi.h>
00034
00035 #ifdef DEBUG
00036 # define DEBUG_COMM
00037 #endif
00038
00039
00040
00041
00042 namespace
00043 cmtk
00044 {
00045
00048
00049 SplineWarpGroupwiseRegistrationRMIFunctional::ReturnType
00050 SplineWarpGroupwiseRegistrationRMIFunctional::EvaluateWithGradient
00051 ( CoordinateVector& v, CoordinateVector& g, const Types::Coordinate step )
00052 {
00053 const int tagCompute = 1;
00054 const int tagResults = 2;
00055 const int tagFinished = 3;
00056
00057 #ifdef DEBUG_GRADIENT
00058 g.SetAll( 12345 );
00059 #endif
00060
00061 const size_t numberOfThreads = Threads::GetNumberOfThreads();
00062 const size_t numberOfXforms = this->m_XformVector.size();
00063
00064 const Self::ReturnType baseValue = this->EvaluateAt( v );
00065
00066 if ( this->m_NeedsUpdateInformationByControlPoint )
00067 {
00068 this->UpdateInformationByControlPoint();
00069 }
00070
00071 const size_t safeNumberOfThreads =
00072 std::min( numberOfThreads, this->m_ControlPointScheduleOverlapFreeMaxLength );
00073
00074 const size_t numberOfDOFsPerCell = 3 * numberOfXforms * safeNumberOfThreads;
00075 if ( this->m_ThreadSumOfProductsMatrix.size() < (2 * numberOfDOFsPerCell) )
00076 {
00077 this->m_ThreadSumOfProductsMatrix.resize( 2 * numberOfDOFsPerCell );
00078 }
00079 if ( this->m_ThreadSumsVector.size() < ( 2 * numberOfDOFsPerCell) )
00080 {
00081 this->m_ThreadSumsVector.resize( 2 * numberOfDOFsPerCell );
00082 }
00083
00084 ThreadParameterArray<Self,EvaluateLocalGradientThreadParameters> threadParams( this, safeNumberOfThreads );
00085
00086 const size_t numberOfControlPoints = this->m_ControlPointSchedule.size();
00087
00088 const size_t controlPointsPerNode = 1 + static_cast<size_t>( numberOfControlPoints / (8 * this->m_SizeMPI) );
00089
00090
00091 const size_t controlPointsRootNode = 1 + controlPointsPerNode / 8;
00092
00093 MPI::Status status;
00094
00095 size_t resultsToReceive = 0;
00096 std::vector<size_t> taskByNode( this->m_SizeMPI, -1 );
00097
00098 const size_t msgBufferSize = 3 * sizeof( Types::Coordinate ) * controlPointsPerNode * numberOfXforms;
00099 std::vector<char> msgBuffer( msgBufferSize );
00100 char* msgBufferPtr = &msgBuffer[0];
00101
00102 bool allJobsSent = false;
00103 size_t firstIndexToCompute = 0;
00104 while ( (firstIndexToCompute < numberOfControlPoints) && ! allJobsSent )
00105 {
00106 if ( this->m_RankMPI == 0 )
00107 {
00108
00109 for ( int dest = 1; (dest < this->m_SizeMPI) && !allJobsSent; ++dest )
00110 {
00111 if ( taskByNode[dest] == (size_t)-1 )
00112 {
00113 if ( firstIndexToCompute >= numberOfControlPoints )
00114 {
00115 allJobsSent = true;
00116 }
00117 else
00118 {
00119 taskByNode[dest] = firstIndexToCompute;
00120 #ifdef DEBUG_COMM
00121 std::cerr << this->m_RankMPI << "\t" << "Sending task " << firstIndexToCompute << " to node " << dest << "\n";
00122 #endif
00123 MPI::COMM_WORLD.Ssend( &firstIndexToCompute, sizeof( firstIndexToCompute ), MPI::CHAR, dest, tagCompute );
00124 ++resultsToReceive;
00125 #ifdef DEBUG_COMM
00126 std::cerr << this->m_RankMPI << "\t" << "Sent task " << firstIndexToCompute << " to node " << dest << "\n";
00127 #endif
00128
00129 firstIndexToCompute += controlPointsPerNode;
00130 }
00131 }
00132 }
00133 }
00134 else
00135 {
00136
00137 #ifdef DEBUG_COMM
00138 std::cerr << this->m_RankMPI << "\t" << "Receiving on node " << this->m_RankMPI << "\n";
00139 #endif
00140 MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, 0 , MPI::ANY_TAG, status );
00141 if ( status.Get_tag() == tagFinished )
00142 {
00143 #ifdef DEBUG_COMM
00144 std::cerr << "Exiting on node " << this->m_RankMPI << "\n";
00145 #endif
00146 allJobsSent = true;
00147 }
00148 else
00149 {
00150 int position = 0;
00151 #ifdef DEBUG_COMM
00152 std::cerr << this->m_RankMPI << "\t" << "Unpacking task on node " << this->m_RankMPI << "\n";
00153 #endif
00154 MPI::CHAR.Unpack( msgBufferPtr, msgBufferSize, &firstIndexToCompute, sizeof( firstIndexToCompute ), position, MPI::COMM_WORLD );
00155 }
00156 }
00157
00158 if ( (firstIndexToCompute < numberOfControlPoints ) && ! allJobsSent )
00159 {
00160 for ( size_t thread = 0; thread < safeNumberOfThreads; ++thread )
00161 {
00162 threadParams[thread].m_ThreadStorageIndex = thread;
00163 threadParams[thread].m_Step = step;
00164 threadParams[thread].m_Gradient = (Types::Coordinate*)msgBufferPtr;
00165 threadParams[thread].m_MetricBaseValue = baseValue;
00166 threadParams[thread].m_FirstIndexToCompute = firstIndexToCompute;
00167 }
00168
00169 const size_t cpsToCompute =
00170 std::min( this->m_RankMPI ? controlPointsPerNode : controlPointsRootNode,
00171 (int)numberOfControlPoints - firstIndexToCompute );
00172 #ifdef DEBUG_COMM
00173 std::cerr << this->m_RankMPI << "\t" << "Computing " << firstIndexToCompute << ":" << cpsToCompute << "\n";
00174 #endif
00175 threadParams.RunInParallelFIFO
00176 ( EvaluateLocalGradientThreadFunc, cpsToCompute, firstIndexToCompute );
00177
00178
00179 if ( this->m_RankMPI == 0 )
00180 {
00181
00182 const size_t maxCpIndex = firstIndexToCompute + cpsToCompute;
00183 #ifdef DEBUG_COMM
00184 StdErr.printf( "%d\tCopying local result %d:%d\n", this->m_RankMPI, firstIndexToCompute, maxCpIndex );
00185 #endif
00186
00187 this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, firstIndexToCompute, maxCpIndex );
00188 firstIndexToCompute += cpsToCompute;
00189 }
00190 else
00191 {
00192
00193 MPI::COMM_WORLD.Ssend( msgBufferPtr, 3 * numberOfXforms * cpsToCompute * sizeof( Types::Coordinate ),
00194 MPI::CHAR, 0, tagResults );
00195 #ifdef DEBUG_COMM
00196 std::cerr << this->m_RankMPI << "\t" << "Sent results of size "
00197 << 3 * numberOfXforms * cpsToCompute * sizeof( Types::Coordinate ) << "\n";
00198 #endif
00199 }
00200 }
00201
00202
00203
00204 if ( this->m_RankMPI == 0 )
00205 {
00206
00207
00208 while ( resultsToReceive && MPI::COMM_WORLD.Iprobe( MPI::ANY_SOURCE, tagResults, status ) )
00209 {
00210
00211 MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, status.Get_source(), tagResults, status );
00212
00213
00214 const int fromNode = status.Get_source();
00215 const int receivedJob = taskByNode[fromNode];
00216
00217 if ( receivedJob != -1 )
00218 {
00219 --resultsToReceive;
00220 taskByNode[fromNode] = -1;
00221
00222
00223 const size_t maxCpIndex = std::min<size_t>( receivedJob + controlPointsPerNode, numberOfControlPoints );
00224 this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, receivedJob, maxCpIndex );
00225 #ifdef DEBUG_COMM
00226 StdErr.printf( "%d\tReceived result %d:%d of size %d from node %d\n", this->m_RankMPI, receivedJob, maxCpIndex, status.Get_elements( MPI::CHAR ), fromNode );
00227 #endif
00228 }
00229 else
00230 {
00231 StdErr.printf( "WARNNING: Received extraneous result from node %d\n", fromNode );
00232 }
00233 }
00234 }
00235 }
00236
00237
00238 while ( resultsToReceive )
00239 {
00240
00241 MPI::COMM_WORLD.Recv( msgBufferPtr, msgBufferSize, MPI::CHAR, MPI::ANY_SOURCE, MPI::ANY_TAG, status );
00242
00243 #ifdef DEBUG_COMM
00244 StdErr.printf( "%d\tFinalizing - received result from node %d\n", this->m_RankMPI, status.Get_source() );
00245 #endif
00246
00247
00248 const int fromNode = status.Get_source();
00249 const int receivedJob = taskByNode[fromNode];
00250
00251 if ( receivedJob != -1 )
00252 {
00253 --resultsToReceive;
00254 taskByNode[fromNode] = -1;
00255
00256
00257 const size_t maxCpIndex = std::min<size_t>( receivedJob + controlPointsPerNode, numberOfControlPoints );
00258 this->ReorderGradientComponents( g.Elements, (Types::Coordinate*)msgBufferPtr, receivedJob, maxCpIndex );
00259 }
00260 else
00261 {
00262 StdErr.printf( "WARNING: Received extraneous result from node %d\n", fromNode );
00263 }
00264 }
00265
00266 if ( this->m_RankMPI == 0 )
00267 {
00268
00269 #ifdef DEBUG_COMM
00270 StdErr << this->m_RankMPI << "\t" << "Leaving computation loop\n";
00271 #endif
00272 for ( int workerNode = 1; workerNode < this->m_SizeMPI; ++workerNode )
00273 {
00274 MPI::COMM_WORLD.Ssend( msgBufferPtr, 1, MPI::CHAR, workerNode, tagFinished );
00275 }
00276 }
00277
00278 #ifdef DEBUG_COMM
00279 StdErr << this->m_RankMPI << "\t" << "Broadcasting gradient estimate\n";
00280 #endif
00281
00282
00283 MPI::COMM_WORLD.Bcast( g.Elements, g.Dim * sizeof( g.Elements[0] ), MPI::CHAR, 0 );
00284
00285 #ifdef DEBUG_COMM
00286 StdErr << this->m_RankMPI << "\t" << "Done with gradient\n\n";
00287 #endif
00288
00289
00290
00291 if ( this->m_PartialGradientMode )
00292 {
00293 const Types::Coordinate gthresh = g.MaxNorm() * this->m_PartialGradientThreshold;
00294 for ( size_t param = 0; param < g.Dim; ++param )
00295 {
00296 #ifdef DEBUG_GRADIENT
00297 if ( this->m_RankMPI == 0 )
00298 if ( g[param] == 12345 )
00299 {
00300 std::cerr << param << "\t";
00301 }
00302 #endif
00303 if ( fabs( g[param] ) < gthresh )
00304 {
00305 g[param] = this->m_ParamStepArray[param] = 0.0;
00306 }
00307 }
00308 }
00309
00310 if ( this->m_ForceZeroSum )
00311 {
00312 this->ForceZeroSumGradient( g );
00313 }
00314
00315 #ifdef DEBUG_COMM
00316 MPI::COMM_WORLD.Barrier();
00317 #endif
00318 return baseValue;
00319 }
00320
00321 void
00322 SplineWarpGroupwiseRegistrationRMIFunctional
00323 ::ReorderGradientComponents
00324 ( Types::Coordinate *const dst, const Types::Coordinate* src,
00325 const size_t fromCpIdx, const size_t toCpIdx )
00326 {
00327 const size_t numberOfParameters = this->ParamVectorDim();
00328
00329
00330 size_t currentParameter = 0;
00331 for ( size_t cpIndex = fromCpIdx; cpIndex < toCpIdx; ++cpIndex )
00332 {
00333 const size_t cp = this->m_ControlPointSchedule[cpIndex];
00334
00335 for ( size_t cparam = 3*cp; cparam < numberOfParameters; cparam += this->m_ParametersPerXform )
00336 {
00337 for ( size_t dim = 0; dim < 3; ++dim, ++currentParameter )
00338 {
00339 dst[cparam+dim] = src[currentParameter];
00340 }
00341 }
00342 }
00343 }
00344
00345 CMTK_THREAD_RETURN_TYPE
00346 SplineWarpGroupwiseRegistrationRMIFunctional::EvaluateLocalGradientThreadFunc
00347 ( void* args )
00348 {
00349 EvaluateLocalGradientThreadParameters* threadParameters = static_cast<EvaluateLocalGradientThreadParameters*>( args );
00350
00351 Self* This = threadParameters->thisObject;
00352 const Self* ThisConst = threadParameters->thisObject;
00353 const size_t threadID = threadParameters->ThisThreadIndex;
00354 const size_t threadStorageIndex = threadParameters->m_ThreadStorageIndex;
00355
00356 #ifndef DEBUG_COMM
00357 if ( !(threadID % 100) )
00358 {
00359 StdErr.printf( "%d / %d\r", (int)threadID, (int)ThisConst->m_ControlPointSchedule.size() );
00360 }
00361 #endif
00362
00363 const size_t cpIndex = ThisConst->m_ControlPointSchedule[threadID];
00364 const DataGrid::RegionType& voi = ThisConst->m_VolumeOfInfluenceArray[cpIndex];
00365 const size_t pixelsPerLineVOI = (voi.To()[0]-voi.From()[0]);
00366 std::vector<Vector3D> vectorList( pixelsPerLineVOI );
00367 std::vector<size_t> count( pixelsPerLineVOI );
00368
00369 const size_t numberOfXforms = ThisConst->m_XformVector.size();
00370 std::vector<size_t> totalNumberOfSamples( 6 * numberOfXforms );
00371 std::fill( totalNumberOfSamples.begin(), totalNumberOfSamples.end(), ThisConst->m_TotalNumberOfSamples );
00372
00373 const size_t parametersPerXform = ThisConst->m_ParametersPerXform;
00374 const size_t paramVectorDim = ThisConst->ParamVectorDim();
00375
00376 const byte paddingValue = ThisConst->m_PaddingValue;
00377 const size_t imagesFrom = ThisConst->m_ActiveImagesFrom;
00378 const size_t imagesTo = ThisConst->m_ActiveImagesTo;
00379 const size_t numberOfImages = imagesTo - imagesFrom;
00380
00381 const UniformVolume* templateGrid = ThisConst->m_TemplateGrid;
00382
00383 const size_t threadDataIdx = 6 * threadStorageIndex * numberOfXforms;
00384 for ( size_t image = 0; image < 6 * numberOfXforms; ++image )
00385 {
00386 const SumsAndProductsVectorType& srcSumOfProducts = ThisConst->m_SumOfProductsMatrix;
00387 SumsAndProductsVectorType& dstSumOfProducts = This->m_ThreadSumOfProductsMatrix[threadDataIdx + image];
00388 dstSumOfProducts.resize( srcSumOfProducts.size() );
00389 std::copy( srcSumOfProducts.begin(), srcSumOfProducts.end(), dstSumOfProducts.begin() );
00390
00391 const SumsAndProductsVectorType& srcSumsVector = ThisConst->m_SumsVector;
00392 SumsAndProductsVectorType& dstSumsVector = This->m_ThreadSumsVector[threadDataIdx + image];
00393 dstSumsVector.resize( srcSumsVector.size() );
00394 std::copy( srcSumsVector.begin(), srcSumsVector.end(), dstSumsVector.begin() );
00395 }
00396
00397 for ( int z = voi.From()[2]; z < voi.To()[2]; ++z )
00398 {
00399 for ( int y = voi.From()[1]; y < voi.To()[1]; ++y )
00400 {
00401
00402 const size_t rowofs = templateGrid->GetOffsetFromIndex( voi.From()[0], y, z );
00403
00404 std::fill( count.begin(), count.end(), 0 );
00405 for ( size_t img = 0; img < numberOfXforms; ++img )
00406 {
00407 const byte* dataPtr = ThisConst->m_Data[img]+rowofs;
00408 for ( size_t x = 0; x < pixelsPerLineVOI; ++x )
00409 {
00410 const byte dataThisPixel = dataPtr[x];
00411 if ( dataThisPixel != paddingValue )
00412 {
00413 ++count[x];
00414 }
00415 }
00416 }
00417
00418 size_t cparam = 3 * cpIndex;
00419 size_t currentParameter = 0;
00420 for ( size_t img = 0; img < numberOfXforms; ++img, cparam += parametersPerXform )
00421 {
00422 SplineWarpXform::SmartPtr xform = This->GetXformByIndex( img );
00423 const UniformVolume* target = ThisConst->m_ImageVector[img];
00424 const byte* targetDataPtr = static_cast<const byte*>( target->GetData()->GetDataPtr() );
00425
00426 for ( size_t dim = 0; dim < 3; ++dim )
00427 {
00428 const size_t cdparam = cparam + dim;
00429 const size_t xfparam = 3 * cpIndex + dim;
00430 const Types::Coordinate pStep = ThisConst->m_ParamStepArray[cdparam] * threadParameters->m_Step;
00431
00432 if ( pStep > 0 )
00433 {
00434 const Types::Coordinate v0 = xform->GetParameter( xfparam );
00435 for ( int delta = 0; delta < 2; ++delta, ++currentParameter )
00436 {
00437 SumsAndProductsVectorType& dstSumOfProducts = This->m_ThreadSumOfProductsMatrix[threadDataIdx+currentParameter];
00438 SumsAndProductsVectorType& dstSumsVector = This->m_ThreadSumsVector[threadDataIdx+currentParameter];
00439
00440 Types::Coordinate vTest = v0 + (2*delta-1) * pStep;
00441 xform->SetParameter( xfparam, vTest );
00442 xform->GetTransformedGridRow( pixelsPerLineVOI, &(vectorList[0]), voi.From()[0], y, z );
00443
00444 byte* rowDataPtr = ThisConst->m_Data[img] + rowofs;
00445 for ( size_t x = 0; x < pixelsPerLineVOI; ++x, ++rowDataPtr )
00446 {
00447 const int baselineData = *rowDataPtr;
00448 if ( (count[x] == numberOfImages) || ((count[x] == numberOfImages-1) && (baselineData == paddingValue) ) )
00449 {
00450 byte newData;
00451 if ( !target->ProbeData( newData, targetDataPtr, vectorList[x] ) )
00452 newData = paddingValue;
00453
00454 if ( newData != baselineData )
00455 {
00456 if ( baselineData != paddingValue )
00457 {
00458 dstSumsVector[img] -= baselineData;
00459 size_t midx = 0;
00460 for ( size_t img2 = 0; img2 < numberOfImages; ++img2 )
00461 {
00462 for ( size_t otherImg = 0; otherImg <= img2; ++otherImg, ++midx )
00463 {
00464 if ( img2 == img )
00465 {
00466 const int otherData = ThisConst->m_Data[otherImg][rowofs+x];
00467 dstSumOfProducts[midx] -= baselineData * otherData;
00468 }
00469 else
00470 {
00471 if ( otherImg == img )
00472 {
00473 const int otherData = ThisConst->m_Data[img2][rowofs+x];
00474 dstSumOfProducts[midx] -= baselineData * otherData;
00475 }
00476 }
00477 }
00478 }
00479 }
00480
00481 if ( newData != paddingValue )
00482 {
00483 if ( count[x] == numberOfImages-1 )
00484 {
00485 ++totalNumberOfSamples[currentParameter];
00486 }
00487
00488 dstSumsVector[img] += newData;
00489 size_t midx = 0;
00490 for ( size_t img2 = imagesFrom; img2 < imagesTo; ++img2 )
00491 {
00492 for ( size_t otherImg = imagesFrom; otherImg <= img2; ++otherImg, ++midx )
00493 {
00494 if ( img2 == img )
00495 {
00496 if ( otherImg == img )
00497 {
00498 dstSumOfProducts[midx] += newData * newData;
00499 }
00500 else
00501 {
00502 const int otherData = ThisConst->m_Data[otherImg][rowofs+x];
00503 dstSumOfProducts[midx] += newData * otherData;
00504 }
00505 }
00506 else
00507 {
00508 if ( otherImg == img )
00509 {
00510 const int otherData = ThisConst->m_Data[img2][rowofs+x];
00511 dstSumOfProducts[midx] += newData * otherData;
00512 }
00513 }
00514 }
00515 }
00516 }
00517 else
00518 {
00519 if ( count[x] == numberOfImages )
00520 {
00521 --totalNumberOfSamples[currentParameter];
00522 }
00523 }
00524 }
00525 }
00526 }
00527 }
00528 xform->SetParameter( xfparam, v0 );
00529 }
00530 else
00531 {
00532 currentParameter += 2;
00533 }
00534 }
00535 }
00536 }
00537 }
00538
00539 Matrix2D<Self::ReturnType> covarianceMatrix( numberOfImages, numberOfImages );
00540
00541
00542 size_t img = 0, currentParameter = 0;
00543 const size_t parameterOffset = 3 * numberOfXforms * (threadID - threadParameters->m_FirstIndexToCompute);
00544
00545 const float fBaseValue = threadParameters->m_MetricBaseValue;
00546 for ( size_t cparam = 3*cpIndex; cparam < paramVectorDim; cparam += parametersPerXform )
00547 {
00548 for ( size_t dim = 0; dim < 3; ++dim, ++img, currentParameter += 2 )
00549 {
00550 const Self::ReturnType fMinus = ThisConst->GetMetric( This->m_ThreadSumOfProductsMatrix[threadDataIdx+currentParameter], This->m_ThreadSumsVector[threadDataIdx+currentParameter], totalNumberOfSamples[currentParameter], covarianceMatrix );
00551 const Self::ReturnType fPlus = ThisConst->GetMetric( This->m_ThreadSumOfProductsMatrix[threadDataIdx+currentParameter+1], This->m_ThreadSumsVector[threadDataIdx+currentParameter+1], totalNumberOfSamples[currentParameter], covarianceMatrix );
00552
00553 if ( (fPlus > fBaseValue) || (fMinus > fBaseValue) )
00554 {
00555 threadParameters->m_Gradient[parameterOffset + currentParameter / 2] = fPlus - fMinus;
00556 }
00557 else
00558 {
00559 threadParameters->m_Gradient[parameterOffset + currentParameter / 2] = 0.0;
00560 }
00561 }
00562 }
00563
00564 return CMTK_THREAD_RETURN_VALUE;
00565 }
00566
00567 }