Tpetra parallel linear algebra Version of the Day
Tpetra_Details_DistributorPlan.cpp
1// ***********************************************************************
2//
3// Tpetra: Templated Linear Algebra Services Package
4// Copyright (2008) Sandia Corporation
5//
6// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7// the U.S. Government retains certain rights in this software.
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright
14// notice, this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// 3. Neither the name of the Corporation nor the names of the
21// contributors may be used to endorse or promote products derived from
22// this software without specific prior written permission.
23//
24// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35//
36// ************************************************************************
37// @HEADER
38
39#include "Tpetra_Details_DistributorPlan.hpp"
40
41#include "Teuchos_StandardParameterEntryValidators.hpp"
42#include "Tpetra_Util.hpp"
43#include <numeric>
44
45namespace Tpetra {
46namespace Details {
47
48std::string
50{
51 if (sendType == DISTRIBUTOR_ISEND) {
52 return "Isend";
53 }
54 else if (sendType == DISTRIBUTOR_RSEND) {
55 return "Rsend";
56 }
57 else if (sendType == DISTRIBUTOR_SEND) {
58 return "Send";
59 }
60 else if (sendType == DISTRIBUTOR_SSEND) {
61 return "Ssend";
62 }
63 else {
64 TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
65 "EDistributorSendType enum value " << sendType << ".");
66 }
67}
68
69std::string
71{
72 switch (how) {
73 case Details::DISTRIBUTOR_NOT_INITIALIZED:
74 return "Not initialized yet";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76 return "By createFromSends";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78 return "By createFromRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
80 return "By createFromSendsAndRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
82 return "By createReverseDistributor";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
84 return "By copy constructor";
85 default:
86 return "INVALID";
87 }
88}
89
90DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
91 : comm_(comm),
92 howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
93 reversePlan_(Teuchos::null),
94 sendType_(DISTRIBUTOR_SEND),
95 barrierBetweenRecvSend_(barrierBetween_default),
96 useDistinctTags_(useDistinctTags_default),
97 sendMessageToSelf_(false),
98 numSendsToOtherProcs_(0),
99 maxSendLength_(0),
100 numReceives_(0),
101 totalReceiveLength_(0)
102{ }
103
104DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
105 : comm_(otherPlan.comm_),
106 howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
107 reversePlan_(otherPlan.reversePlan_),
108 sendType_(otherPlan.sendType_),
109 barrierBetweenRecvSend_(otherPlan.barrierBetweenRecvSend_),
110 useDistinctTags_(otherPlan.useDistinctTags_),
111 sendMessageToSelf_(otherPlan.sendMessageToSelf_),
112 numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
113 procIdsToSendTo_(otherPlan.procIdsToSendTo_),
114 startsTo_(otherPlan.startsTo_),
115 lengthsTo_(otherPlan.lengthsTo_),
116 maxSendLength_(otherPlan.maxSendLength_),
117 indicesTo_(otherPlan.indicesTo_),
118 numReceives_(otherPlan.numReceives_),
119 totalReceiveLength_(otherPlan.totalReceiveLength_),
120 lengthsFrom_(otherPlan.lengthsFrom_),
121 procsFrom_(otherPlan.procsFrom_),
122 startsFrom_(otherPlan.startsFrom_),
123 indicesFrom_(otherPlan.indicesFrom_)
124{ }
125
126int DistributorPlan::getTag(const int pathTag) const {
127 return useDistinctTags_ ? pathTag : comm_->getTag();
128}
129
130size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
131 using Teuchos::outArg;
132 using Teuchos::REDUCE_MAX;
133 using Teuchos::reduceAll;
134 using std::endl;
135
136 const size_t numExports = exportProcIDs.size();
137 const int myProcID = comm_->getRank();
138 const int numProcs = comm_->getSize();
139
140 // exportProcIDs tells us the communication pattern for this
141 // distributor. It dictates the way that the export data will be
142 // interpreted in doPosts(). We want to perform at most one
143 // send per process in doPosts; this is for two reasons:
144 // * minimize latency / overhead in the comm routines (nice)
145 // * match the number of receives and sends between processes
146 // (necessary)
147 //
148 // Teuchos::Comm requires that the data for a send are contiguous
149 // in a send buffer. Therefore, if the data in the send buffer
150 // for doPosts() are not contiguous, they will need to be copied
151 // into a contiguous buffer. The user has specified this
152 // noncontiguous pattern and we can't do anything about it.
153 // However, if they do not provide an efficient pattern, we will
154 // warn them if one of the following compile-time options has been
155 // set:
156 // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
157 // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
158 //
159 // If the data are contiguous, then we can post the sends in situ
160 // (i.e., without needing to copy them into a send buffer).
161 //
162 // Determine contiguity. There are a number of ways to do this:
163 // * If the export IDs are sorted, then all exports to a
164 // particular proc must be contiguous. This is what Epetra does.
165 // * If the export ID of the current export already has been
166 // listed, then the previous listing should correspond to the
167 // same export. This tests contiguity, but not sortedness.
168 //
169 // Both of these tests require O(n), where n is the number of
170 // exports. However, the latter will positively identify a greater
171 // portion of contiguous patterns. We use the latter method.
172 //
173 // Check to see if values are grouped by procs without gaps
174 // If so, indices_to -> 0.
175
176 // Set up data structures for quick traversal of arrays.
177 // This contains the number of sends for each process ID.
178 //
179 // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
180 // that create an array of length the number of processes in the
181 // communicator (plus one). Given how this code uses this array,
182 // it should be straightforward to replace it with a hash table or
183 // some other more space-efficient data structure. In practice,
184 // most of the entries of starts should be zero for a sufficiently
185 // large process count, unless the communication pattern is dense.
186 // Note that it's important to be able to iterate through keys (i
187 // for which starts[i] is nonzero) in increasing order.
188 Teuchos::Array<size_t> starts (numProcs + 1, 0);
189
190 // numActive is the number of sends that are not Null
191 size_t numActive = 0;
192 int needSendBuff = 0; // Boolean
193
194 for (size_t i = 0; i < numExports; ++i) {
195 const int exportID = exportProcIDs[i];
196 if (exportID >= 0) {
197 // exportID is a valid process ID. Increment the number of
198 // messages this process will send to that process.
199 ++starts[exportID];
200
201 // If we're sending more than one message to process exportID,
202 // then it is possible that the data are not contiguous.
203 // Check by seeing if the previous process ID in the list
204 // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
205 // because if starts[exportID] > 1, then i must be > 1 (since
206 // the starts array was filled with zeros initially).
207
208 // null entries break continuity.
209 // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
210 if (needSendBuff == 0 && starts[exportID] > 1 &&
211 exportID != exportProcIDs[i-1]) {
212 needSendBuff = 1;
213 }
214 ++numActive;
215 }
216 }
217
218#if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
219 {
220 int global_needSendBuff;
221 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
222 outArg (global_needSendBuff));
224 global_needSendBuff != 0, std::runtime_error,
225 "::createFromSends: Grouping export IDs together by process rank often "
226 "improves performance.");
227 }
228#endif
229
230 // Determine from the caller's data whether or not the current
231 // process should send (a) message(s) to itself.
232 if (starts[myProcID] != 0) {
233 sendMessageToSelf_ = true;
234 }
235 else {
236 sendMessageToSelf_ = false;
237 }
238
239 if (! needSendBuff) {
240 // grouped by proc, no send buffer or indicesTo_ needed
241 numSendsToOtherProcs_ = 0;
242 // Count total number of sends, i.e., total number of procs to
243 // which we are sending. This includes myself, if applicable.
244 for (int i = 0; i < numProcs; ++i) {
245 if (starts[i]) {
246 ++numSendsToOtherProcs_;
247 }
248 }
249
250 // Not only do we not need these, but we must clear them, as
251 // empty status of indicesTo is a flag used later.
252 indicesTo_.resize(0);
253 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
254 // includes self sends. Set their values to zeros.
255 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
256 startsTo_.assign(numSendsToOtherProcs_,0);
257 lengthsTo_.assign(numSendsToOtherProcs_,0);
258
259 // set startsTo to the offset for each send (i.e., each proc ID)
260 // set procsTo to the proc ID for each send
261 // in interpreting this code, remember that we are assuming contiguity
262 // that is why index skips through the ranks
263 {
264 size_t index = 0, procIndex = 0;
265 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
266 while (exportProcIDs[procIndex] < 0) {
267 ++procIndex; // skip all negative proc IDs
268 }
269 startsTo_[i] = procIndex;
270 int procID = exportProcIDs[procIndex];
271 procIdsToSendTo_[i] = procID;
272 index += starts[procID];
273 procIndex += starts[procID];
274 }
275 }
276 // sort the startsTo and proc IDs together, in ascending order, according
277 // to proc IDs
278 if (numSendsToOtherProcs_ > 0) {
279 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
280 }
281 // compute the maximum send length
282 maxSendLength_ = 0;
283 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
284 int procID = procIdsToSendTo_[i];
285 lengthsTo_[i] = starts[procID];
286 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
287 maxSendLength_ = lengthsTo_[i];
288 }
289 }
290 }
291 else {
292 // not grouped by proc, need send buffer and indicesTo_
293
294 // starts[i] is the number of sends to proc i
295 // numActive equals number of sends total, \sum_i starts[i]
296
297 // this loop starts at starts[1], so explicitly check starts[0]
298 if (starts[0] == 0 ) {
299 numSendsToOtherProcs_ = 0;
300 }
301 else {
302 numSendsToOtherProcs_ = 1;
303 }
304 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
305 im1=starts.begin();
306 i != starts.end(); ++i)
307 {
308 if (*i != 0) ++numSendsToOtherProcs_;
309 *i += *im1;
310 im1 = i;
311 }
312 // starts[i] now contains the number of exports to procs 0 through i
313
314 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
315 i=starts.rbegin()+1;
316 i != starts.rend(); ++i)
317 {
318 *ip1 = *i;
319 ip1 = i;
320 }
321 starts[0] = 0;
322 // starts[i] now contains the number of exports to procs 0 through
323 // i-1, i.e., all procs before proc i
324
325 indicesTo_.resize(numActive);
326
327 for (size_t i = 0; i < numExports; ++i) {
328 if (exportProcIDs[i] >= 0) {
329 // record the offset to the sendBuffer for this export
330 indicesTo_[starts[exportProcIDs[i]]] = i;
331 // now increment the offset for this proc
332 ++starts[exportProcIDs[i]];
333 }
334 }
335 // our send buffer will contain the export data for each of the procs
336 // we communicate with, in order by proc id
337 // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
338 // indicesTo now maps each export to the location in our send buffer
339 // associated with the export
340 // data for export i located at sendBuffer[indicesTo[i]]
341 //
342 // starts[i] once again contains the number of exports to
343 // procs 0 through i
344 for (int proc = numProcs-1; proc != 0; --proc) {
345 starts[proc] = starts[proc-1];
346 }
347 starts.front() = 0;
348 starts[numProcs] = numActive;
349 //
350 // starts[proc] once again contains the number of exports to
351 // procs 0 through proc-1
352 // i.e., the start of my data in the sendBuffer
353
354 // this contains invalid data at procs we don't care about, that is okay
355 procIdsToSendTo_.resize(numSendsToOtherProcs_);
356 startsTo_.resize(numSendsToOtherProcs_);
357 lengthsTo_.resize(numSendsToOtherProcs_);
358
359 // for each group of sends/exports, record the destination proc,
360 // the length, and the offset for this send into the
361 // send buffer (startsTo_)
362 maxSendLength_ = 0;
363 size_t snd = 0;
364 for (int proc = 0; proc < numProcs; ++proc ) {
365 if (starts[proc+1] != starts[proc]) {
366 lengthsTo_[snd] = starts[proc+1] - starts[proc];
367 startsTo_[snd] = starts[proc];
368 // record max length for all off-proc sends
369 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
370 maxSendLength_ = lengthsTo_[snd];
371 }
372 procIdsToSendTo_[snd] = proc;
373 ++snd;
374 }
375 }
376 }
377
378 if (sendMessageToSelf_) {
379 --numSendsToOtherProcs_;
380 }
381
382 // Invert map to see what msgs are received and what length
383 computeReceives();
384
385 // createFromRecvs() calls createFromSends(), but will set
386 // howInitialized_ again after calling createFromSends().
387 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
388
389 return totalReceiveLength_;
390}
391
392void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
393{
394 createFromSends(remoteProcIDs);
395
396 *this = *getReversePlan();
397
398 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
399}
400
401void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
402 const Teuchos::ArrayView<const int>& remoteProcIDs)
403{
404 // note the exportProcIDs and remoteProcIDs _must_ be a list that has
405 // an entry for each GID. If the export/remoteProcIDs is taken from
406 // the getProcs{From|To} lists that are extracted from a previous distributor,
407 // it will generate a wrong answer, because those lists have a unique entry
408 // for each processor id. A version of this with lengthsTo and lengthsFrom
409 // should be made.
410
411 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
412
413
414 int myProcID = comm_->getRank ();
415 int numProcs = comm_->getSize();
416
417 const size_t numExportIDs = exportProcIDs.size();
418 Teuchos::Array<size_t> starts (numProcs + 1, 0);
419
420 size_t numActive = 0;
421 int needSendBuff = 0; // Boolean
422
423 for(size_t i = 0; i < numExportIDs; i++ )
424 {
425 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
426 needSendBuff = 1;
427 if( exportProcIDs[i] >= 0 )
428 {
429 ++starts[ exportProcIDs[i] ];
430 ++numActive;
431 }
432 }
433
434 sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
435
436 numSendsToOtherProcs_ = 0;
437
438 if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
439 {
440 if (starts[0] == 0 ) {
441 numSendsToOtherProcs_ = 0;
442 }
443 else {
444 numSendsToOtherProcs_ = 1;
445 }
446 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
447 im1=starts.begin();
448 i != starts.end(); ++i)
449 {
450 if (*i != 0) ++numSendsToOtherProcs_;
451 *i += *im1;
452 im1 = i;
453 }
454 // starts[i] now contains the number of exports to procs 0 through i
455
456 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
457 i=starts.rbegin()+1;
458 i != starts.rend(); ++i)
459 {
460 *ip1 = *i;
461 ip1 = i;
462 }
463 starts[0] = 0;
464 // starts[i] now contains the number of exports to procs 0 through
465 // i-1, i.e., all procs before proc i
466
467 indicesTo_.resize(numActive);
468
469 for (size_t i = 0; i < numExportIDs; ++i) {
470 if (exportProcIDs[i] >= 0) {
471 // record the offset to the sendBuffer for this export
472 indicesTo_[starts[exportProcIDs[i]]] = i;
473 // now increment the offset for this proc
474 ++starts[exportProcIDs[i]];
475 }
476 }
477 for (int proc = numProcs-1; proc != 0; --proc) {
478 starts[proc] = starts[proc-1];
479 }
480 starts.front() = 0;
481 starts[numProcs] = numActive;
482 procIdsToSendTo_.resize(numSendsToOtherProcs_);
483 startsTo_.resize(numSendsToOtherProcs_);
484 lengthsTo_.resize(numSendsToOtherProcs_);
485 maxSendLength_ = 0;
486 size_t snd = 0;
487 for (int proc = 0; proc < numProcs; ++proc ) {
488 if (starts[proc+1] != starts[proc]) {
489 lengthsTo_[snd] = starts[proc+1] - starts[proc];
490 startsTo_[snd] = starts[proc];
491 // record max length for all off-proc sends
492 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
493 maxSendLength_ = lengthsTo_[snd];
494 }
495 procIdsToSendTo_[snd] = proc;
496 ++snd;
497 }
498 }
499 }
500 else {
501 // grouped by proc, no send buffer or indicesTo_ needed
502 numSendsToOtherProcs_ = 0;
503 // Count total number of sends, i.e., total number of procs to
504 // which we are sending. This includes myself, if applicable.
505 for (int i = 0; i < numProcs; ++i) {
506 if (starts[i]) {
507 ++numSendsToOtherProcs_;
508 }
509 }
510
511 // Not only do we not need these, but we must clear them, as
512 // empty status of indicesTo is a flag used later.
513 indicesTo_.resize(0);
514 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
515 // includes self sends. Set their values to zeros.
516 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
517 startsTo_.assign(numSendsToOtherProcs_,0);
518 lengthsTo_.assign(numSendsToOtherProcs_,0);
519
520 // set startsTo to the offset for each send (i.e., each proc ID)
521 // set procsTo to the proc ID for each send
522 // in interpreting this code, remember that we are assuming contiguity
523 // that is why index skips through the ranks
524 {
525 size_t index = 0, procIndex = 0;
526 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
527 while (exportProcIDs[procIndex] < 0) {
528 ++procIndex; // skip all negative proc IDs
529 }
530 startsTo_[i] = procIndex;
531 int procID = exportProcIDs[procIndex];
532 procIdsToSendTo_[i] = procID;
533 index += starts[procID];
534 procIndex += starts[procID];
535 }
536 }
537 // sort the startsTo and proc IDs together, in ascending order, according
538 // to proc IDs
539 if (numSendsToOtherProcs_ > 0) {
540 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
541 }
542 // compute the maximum send length
543 maxSendLength_ = 0;
544 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
545 int procID = procIdsToSendTo_[i];
546 lengthsTo_[i] = starts[procID];
547 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
548 maxSendLength_ = lengthsTo_[i];
549 }
550 }
551 }
552
553
554 numSendsToOtherProcs_ -= sendMessageToSelf_;
555 std::vector<int> recv_list;
556 recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
557
558 int last_pid=-2;
559 for(int i=0; i<remoteProcIDs.size(); i++) {
560 if(remoteProcIDs[i]>last_pid) {
561 recv_list.push_back(remoteProcIDs[i]);
562 last_pid = remoteProcIDs[i];
563 }
564 else if (remoteProcIDs[i]<last_pid)
565 throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
566 }
567 numReceives_ = recv_list.size();
568 if(numReceives_) {
569 procsFrom_.assign(numReceives_,0);
570 lengthsFrom_.assign(numReceives_,0);
571 indicesFrom_.assign(numReceives_,0);
572 startsFrom_.assign(numReceives_,0);
573 }
574 for(size_t i=0,j=0; i<numReceives_; ++i) {
575 int jlast=j;
576 procsFrom_[i] = recv_list[i];
577 startsFrom_[i] = j;
578 for( ; j<(size_t)remoteProcIDs.size() &&
579 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
580 lengthsFrom_[i] = j-jlast;
581 }
582 totalReceiveLength_ = remoteProcIDs.size();
583 indicesFrom_.clear ();
584 numReceives_-=sendMessageToSelf_;
585}
586
587Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
588 if (reversePlan_.is_null()) createReversePlan();
589 return reversePlan_;
590}
591
592void DistributorPlan::createReversePlan() const
593{
594 reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
595 reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
596 reversePlan_->sendType_ = sendType_;
597 reversePlan_->barrierBetweenRecvSend_ = barrierBetweenRecvSend_;
598
599 // The total length of all the sends of this DistributorPlan. We
600 // calculate it because it's the total length of all the receives
601 // of the reverse DistributorPlan.
602 size_t totalSendLength =
603 std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
604
605 // The maximum length of any of the receives of this DistributorPlan.
606 // We calculate it because it's the maximum length of any of the
607 // sends of the reverse DistributorPlan.
608 size_t maxReceiveLength = 0;
609 const int myProcID = comm_->getRank();
610 for (size_t i=0; i < numReceives_; ++i) {
611 if (procsFrom_[i] != myProcID) {
612 // Don't count receives for messages sent by myself to myself.
613 if (lengthsFrom_[i] > maxReceiveLength) {
614 maxReceiveLength = lengthsFrom_[i];
615 }
616 }
617 }
618
619 reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
620 reversePlan_->numSendsToOtherProcs_ = numReceives_;
621 reversePlan_->procIdsToSendTo_ = procsFrom_;
622 reversePlan_->startsTo_ = startsFrom_;
623 reversePlan_->lengthsTo_ = lengthsFrom_;
624 reversePlan_->maxSendLength_ = maxReceiveLength;
625 reversePlan_->indicesTo_ = indicesFrom_;
626 reversePlan_->numReceives_ = numSendsToOtherProcs_;
627 reversePlan_->totalReceiveLength_ = totalSendLength;
628 reversePlan_->lengthsFrom_ = lengthsTo_;
629 reversePlan_->procsFrom_ = procIdsToSendTo_;
630 reversePlan_->startsFrom_ = startsTo_;
631 reversePlan_->indicesFrom_ = indicesTo_;
632 reversePlan_->useDistinctTags_ = useDistinctTags_;
633}
634
635void DistributorPlan::computeReceives()
636{
637 using Teuchos::Array;
638 using Teuchos::ArrayRCP;
639 using Teuchos::as;
640 using Teuchos::CommStatus;
641 using Teuchos::CommRequest;
642 using Teuchos::ireceive;
643 using Teuchos::RCP;
644 using Teuchos::rcp;
645 using Teuchos::REDUCE_SUM;
646 using Teuchos::receive;
647 using Teuchos::reduce;
648 using Teuchos::scatter;
649 using Teuchos::send;
650 using Teuchos::waitAll;
651
652 const int myRank = comm_->getRank();
653 const int numProcs = comm_->getSize();
654
655 // MPI tag for nonblocking receives and blocking sends in this method.
656 const int pathTag = 2;
657 const int tag = getTag(pathTag);
658
659 // toProcsFromMe[i] == the number of messages sent by this process
660 // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
661 // concern the contiguous sends. Therefore, each process will be
662 // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
663 // either be 0 or 1.
664 {
665 Array<int> toProcsFromMe (numProcs, 0);
666#ifdef HAVE_TEUCHOS_DEBUG
667 bool counting_error = false;
668#endif // HAVE_TEUCHOS_DEBUG
669 for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
670#ifdef HAVE_TEUCHOS_DEBUG
671 if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
672 counting_error = true;
673 }
674#endif // HAVE_TEUCHOS_DEBUG
675 toProcsFromMe[procIdsToSendTo_[i]] = 1;
676 }
677#ifdef HAVE_TEUCHOS_DEBUG
678 SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
679 "Tpetra::Distributor::computeReceives: There was an error on at least "
680 "one process in counting the number of messages send by that process to "
681 "the other processs. Please report this bug to the Tpetra developers.",
682 *comm_);
683#endif // HAVE_TEUCHOS_DEBUG
684
685 // Compute the number of receives that this process needs to
686 // post. The number of receives includes any self sends (i.e.,
687 // messages sent by this process to itself).
688 //
689 // (We will use numReceives_ this below to post exactly that
690 // number of receives, with MPI_ANY_SOURCE as the sending rank.
691 // This will tell us from which processes this process expects
692 // to receive, and how many packets of data we expect to receive
693 // from each process.)
694 //
695 // toProcsFromMe[i] is the number of messages sent by this
696 // process to process i. Compute the sum (elementwise) of all
697 // the toProcsFromMe arrays on all processes in the
698 // communicator. If the array x is that sum, then if this
699 // process has rank j, x[j] is the number of messages sent
700 // to process j, that is, the number of receives on process j
701 // (including any messages sent by process j to itself).
702 //
703 // Yes, this requires storing and operating on an array of
704 // length P, where P is the number of processes in the
705 // communicator. Epetra does this too. Avoiding this O(P)
706 // memory bottleneck would require some research.
707 //
708 // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
709 // implement this O(P) memory algorithm.
710 //
711 // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
712 // process (0) from toProcsFromMe, to numRecvsOnEachProc.
713 // Then, scatter the latter, so that each process p gets
714 // numRecvsOnEachProc[p].
715 //
716 // 2. Like #1, but use MPI_Reduce_scatter instead of
717 // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
718 // optimized to reduce the number of messages, but
719 // MPI_Reduce_scatter is more general than we need (it
720 // allows the equivalent of MPI_Scatterv). See Bug 6336.
721 //
722 // 3. Do an all-reduce on toProcsFromMe, and let my process
723 // (with rank myRank) get numReceives_ from
724 // toProcsFromMe[myRank]. The HPCCG miniapp uses the
725 // all-reduce method.
726 //
727 // Approaches 1 and 3 have the same critical path length.
728 // However, #3 moves more data. This is because the final
729 // result is just one integer, but #3 moves a whole array of
730 // results to all the processes. This is why we use Approach 1
731 // here.
732 //
733 // mfh 12 Apr 2013: See discussion in createFromSends() about
734 // how we could use this communication to propagate an error
735 // flag for "free" in a release build.
736
737 const int root = 0; // rank of root process of the reduction
738 Array<int> numRecvsOnEachProc; // temp; only needed on root
739 if (myRank == root) {
740 numRecvsOnEachProc.resize (numProcs);
741 }
742 int numReceivesAsInt = 0; // output
743 reduce<int, int> (toProcsFromMe.getRawPtr (),
744 numRecvsOnEachProc.getRawPtr (),
745 numProcs, REDUCE_SUM, root, *comm_);
746 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
747 &numReceivesAsInt, 1, root, *comm_);
748 numReceives_ = static_cast<size_t> (numReceivesAsInt);
749 }
750
751 // Now we know numReceives_, which is this process' number of
752 // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
753 // with this number of entries.
754 lengthsFrom_.assign (numReceives_, 0);
755 procsFrom_.assign (numReceives_, 0);
756
757 //
758 // Ask (via nonblocking receive) each process from which we are
759 // receiving how many packets we should expect from it in the
760 // communication pattern.
761 //
762
763 // At this point, numReceives_ includes any self message that
764 // there may be. At the end of this routine, we'll subtract off
765 // the self message (if there is one) from numReceives_. In this
766 // routine, we don't need to receive a message from ourselves in
767 // order to figure out our lengthsFrom_ and source process ID; we
768 // can just ask ourselves directly. Thus, the actual number of
769 // nonblocking receives we post here does not include the self
770 // message.
771 const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
772
773 // Teuchos' wrapper for nonblocking receives requires receive
774 // buffers that it knows won't go away. This is why we use RCPs,
775 // one RCP per nonblocking receive request. They get allocated in
776 // the loop below.
777 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
778 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
779 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
780
781 // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
782 // (receive data from any process).
783#ifdef HAVE_MPI
784 const int anySourceProc = MPI_ANY_SOURCE;
785#else
786 const int anySourceProc = -1;
787#endif
788
789 // Post the (nonblocking) receives.
790 for (size_t i = 0; i < actualNumReceives; ++i) {
791 // Once the receive completes, we can ask the corresponding
792 // CommStatus object (output by wait()) for the sending process'
793 // ID (which we'll assign to procsFrom_[i] -- don't forget to
794 // do that!).
795 lengthsFromBuffers[i].resize (1);
796 lengthsFromBuffers[i][0] = as<size_t> (0);
797 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
798 tag, *comm_);
799 }
800
801 // Post the sends: Tell each process to which we are sending how
802 // many packets it should expect from us in the communication
803 // pattern. We could use nonblocking sends here, as long as we do
804 // a waitAll() on all the sends and receives at once.
805 //
806 // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
807 // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
808 // not include any message that it might send to itself.
809 for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
810 if (procIdsToSendTo_[i] != myRank) {
811 // Send a message to procIdsToSendTo_[i], telling that process that
812 // this communication pattern will send that process
813 // lengthsTo_[i] blocks of packets.
814 const size_t* const lengthsTo_i = &lengthsTo_[i];
815 send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), tag, *comm_);
816 }
817 else {
818 // We don't need a send in the self-message case. If this
819 // process will send a message to itself in the communication
820 // pattern, then the last element of lengthsFrom_ and
821 // procsFrom_ corresponds to the self-message. Of course
822 // this process knows how long the message is, and the process
823 // ID is its own process ID.
824 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
825 procsFrom_[numReceives_-1] = myRank;
826 }
827 }
828
829 //
830 // Wait on all the receives. When they arrive, check the status
831 // output of wait() for the receiving process ID, unpack the
832 // request buffers into lengthsFrom_, and set procsFrom_ from the
833 // status.
834 //
835 waitAll (*comm_, requests (), statuses ());
836 for (size_t i = 0; i < actualNumReceives; ++i) {
837 lengthsFrom_[i] = *lengthsFromBuffers[i];
838 procsFrom_[i] = statuses[i]->getSourceRank ();
839 }
840
841 // Sort the procsFrom_ array, and apply the same permutation to
842 // lengthsFrom_. This ensures that procsFrom_[i] and
843 // lengthsFrom_[i] refers to the same thing.
844 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
845
846 // Compute indicesFrom_
847 totalReceiveLength_ =
848 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
849 indicesFrom_.clear ();
850
851 startsFrom_.clear ();
852 startsFrom_.reserve (numReceives_);
853 for (size_t i = 0, j = 0; i < numReceives_; ++i) {
854 startsFrom_.push_back(j);
855 j += lengthsFrom_[i];
856 }
857
858 if (sendMessageToSelf_) {
859 --numReceives_;
860 }
861}
862
863void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
864{
865 using Teuchos::FancyOStream;
866 using Teuchos::getIntegralValue;
867 using Teuchos::ParameterList;
868 using Teuchos::parameterList;
869 using Teuchos::RCP;
870 using std::endl;
871
872 if (! plist.is_null()) {
873 RCP<const ParameterList> validParams = getValidParameters ();
874 plist->validateParametersAndSetDefaults (*validParams);
875
876 const bool barrierBetween =
877 plist->get<bool> ("Barrier between receives and sends");
878 const Details::EDistributorSendType sendType =
879 getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
880 const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
881 {
882 // mfh 03 May 2016: We keep this option only for backwards
883 // compatibility, but it must always be true. See discussion of
884 // Github Issue #227.
885 const bool enable_cuda_rdma =
886 plist->get<bool> ("Enable MPI CUDA RDMA support");
887 TEUCHOS_TEST_FOR_EXCEPTION
888 (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
889 "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
890 "support\" = false. This is no longer valid. You don't need to "
891 "specify this option any more; Tpetra assumes it is always true. "
892 "This is a very light assumption on the MPI implementation, and in "
893 "fact does not actually involve hardware or system RDMA support. "
894 "Tpetra just assumes that the MPI implementation can tell whether a "
895 "pointer points to host memory or CUDA device memory.");
896 }
897
898 // We check this property explicitly, since we haven't yet learned
899 // how to make a validator that can cross-check properties.
900 // Later, turn this into a validator so that it can be embedded in
901 // the valid ParameterList and used in Optika.
902 TEUCHOS_TEST_FOR_EXCEPTION
903 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
904 std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
905 << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
906 "between receives and sends." << endl << "This is invalid; you must "
907 "include the barrier if you use ready sends." << endl << "Ready sends "
908 "require that their corresponding receives have already been posted, "
909 "and the only way to guarantee that in general is with a barrier.");
910
911 // Now that we've validated the input list, save the results.
912 sendType_ = sendType;
913 barrierBetweenRecvSend_ = barrierBetween;
914 useDistinctTags_ = useDistinctTags;
915
916 // ParameterListAcceptor semantics require pointer identity of the
917 // sublist passed to setParameterList(), so we save the pointer.
918 this->setMyParamList (plist);
919 }
920}
921
922Teuchos::Array<std::string> distributorSendTypes()
923{
924 Teuchos::Array<std::string> sendTypes;
925 sendTypes.push_back ("Isend");
926 sendTypes.push_back ("Rsend");
927 sendTypes.push_back ("Send");
928 sendTypes.push_back ("Ssend");
929 return sendTypes;
930}
931
932Teuchos::RCP<const Teuchos::ParameterList>
933DistributorPlan::getValidParameters() const
934{
935 using Teuchos::Array;
936 using Teuchos::ParameterList;
937 using Teuchos::parameterList;
938 using Teuchos::RCP;
939 using Teuchos::setStringToIntegralParameter;
940
941 const bool barrierBetween = Details::barrierBetween_default;
942 const bool useDistinctTags = Details::useDistinctTags_default;
943
944 Array<std::string> sendTypes = distributorSendTypes ();
945 const std::string defaultSendType ("Send");
946 Array<Details::EDistributorSendType> sendTypeEnums;
947 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
948 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
949 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
950 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
951
952 RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
953 plist->set ("Barrier between receives and sends", barrierBetween,
954 "Whether to execute a barrier between receives and sends in do"
955 "[Reverse]Posts(). Required for correctness when \"Send type\""
956 "=\"Rsend\", otherwise correct but not recommended.");
957 setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
958 defaultSendType, "When using MPI, the variant of send to use in "
959 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
960 plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
961 "MPI message tags for different code paths. Highly recommended"
962 " to avoid message collisions.");
963 plist->set ("Timer Label","","Label for Time Monitor output");
964 plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
965 "tell whether a pointer points to host memory or CUDA device "
966 "memory. You don't need to specify this option any more; "
967 "Tpetra assumes it is always true. This is a very light "
968 "assumption on the MPI implementation, and in fact does not "
969 "actually involve hardware or system RDMA support.");
970
971 return Teuchos::rcp_const_cast<const ParameterList> (plist);
972}
973
974}
975}
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.