Tpetra parallel linear algebra Version of the Day
Tpetra_Details_DistributorActor.hpp
1// @HEADER
2// ***********************************************************************
3//
4// Tpetra: Templated Linear Algebra Services Package
5// Copyright (2008) Sandia Corporation
6//
7// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8// the U.S. Government retains certain rights in this software.
9//
10// Redistribution and use in source and binary forms, with or without
11// modification, are permitted provided that the following conditions are
12// met:
13//
14// 1. Redistributions of source code must retain the above copyright
15// notice, this list of conditions and the following disclaimer.
16//
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// 3. Neither the name of the Corporation nor the names of the
22// contributors may be used to endorse or promote products derived from
23// this software without specific prior written permission.
24//
25// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36//
37// ************************************************************************
38// @HEADER
39
40#ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
41#define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
42
43#include "Tpetra_Details_DistributorPlan.hpp"
44#include "Tpetra_Util.hpp"
45
46#include "Teuchos_Array.hpp"
47#include "Teuchos_Comm.hpp"
48#include "Teuchos_RCP.hpp"
49#include "Teuchos_Time.hpp"
50
51namespace Tpetra {
52namespace Details {
53
54template <class View1, class View2>
55constexpr bool areKokkosViews = Kokkos::Impl::is_view<View1>::value && Kokkos::Impl::is_view<View2>::value;
56
57class DistributorActor {
58
59public:
60 DistributorActor();
61 DistributorActor(const DistributorActor& otherActor);
62
63 template <class ExpView, class ImpView>
64 void doPostsAndWaits(const DistributorPlan& plan,
65 const ExpView &exports,
66 size_t numPackets,
67 const ImpView &imports);
68
69 template <class ExpView, class ImpView>
70 void doPostsAndWaits(const DistributorPlan& plan,
71 const ExpView &exports,
72 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
73 const ImpView &imports,
74 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
75
76 template <class ExpView, class ImpView>
77 void doPosts(const DistributorPlan& plan,
78 const ExpView& exports,
79 size_t numPackets,
80 const ImpView& imports);
81
82 template <class ExpView, class ImpView>
83 void doPosts(const DistributorPlan& plan,
84 const ExpView &exports,
85 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
86 const ImpView &imports,
87 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
88
89 void doWaits(const DistributorPlan& plan);
90
91private:
92 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
93
94#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
95 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
96 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
97 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
98 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
99 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
100 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
101 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
102 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
103 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
104 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
105 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
106 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
107 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
108
110 void makeTimers();
111#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
112};
113
114template <class ExpView, class ImpView>
115void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
116 const ExpView& exports,
117 size_t numPackets,
118 const ImpView& imports)
119{
120 static_assert(areKokkosViews<ExpView, ImpView>,
121 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
122 doPosts(plan, exports, numPackets, imports);
123 doWaits(plan);
124}
125
126template <class ExpView, class ImpView>
127void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
128 const ExpView& exports,
129 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
130 const ImpView& imports,
131 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
132{
133 static_assert(areKokkosViews<ExpView, ImpView>,
134 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
135 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
136 doWaits(plan);
137}
138
139template <class ExpView, class ImpView>
140void DistributorActor::doPosts(const DistributorPlan& plan,
141 const ExpView& exports,
142 size_t numPackets,
143 const ImpView& imports)
144{
145 static_assert(areKokkosViews<ExpView, ImpView>,
146 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
147 using Teuchos::Array;
148 using Teuchos::as;
149 using Teuchos::FancyOStream;
150 using Teuchos::includesVerbLevel;
151 using Teuchos::ireceive;
152 using Teuchos::isend;
153 using Teuchos::readySend;
154 using Teuchos::send;
155 using Teuchos::ssend;
156 using Teuchos::TypeNameTraits;
157 using Teuchos::typeName;
158 using std::endl;
159 using Kokkos::Compat::create_const_view;
160 using Kokkos::Compat::create_view;
161 using Kokkos::Compat::subview_offset;
162 using Kokkos::Compat::deep_copy_offset;
163 typedef Array<size_t>::size_type size_type;
164 typedef ExpView exports_view_type;
165 typedef ImpView imports_view_type;
166
167#ifdef KOKKOS_ENABLE_CUDA
168 static_assert
169 (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
170 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
171 "Please do not use Tpetra::Distributor with UVM allocations. "
172 "See Trilinos GitHub issue #1088.");
173#endif // KOKKOS_ENABLE_CUDA
174
175#ifdef KOKKOS_ENABLE_SYCL
176 static_assert
177 (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
178 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
179 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
180 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
181#endif // KOKKOS_ENABLE_SYCL
182
183#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
184 Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
185#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
186
187 const int myRank = plan.getComm()->getRank ();
188 // Run-time configurable parameters that come from the input
189 // ParameterList set by setParameterList().
190 const Details::EDistributorSendType sendType = plan.getSendType();
191 const bool doBarrier = plan.barrierBetweenRecvSend();
192
193 TEUCHOS_TEST_FOR_EXCEPTION(
194 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
195 std::logic_error,
196 "Tpetra::Distributor::doPosts(3 args, Kokkos): Ready-send version "
197 "requires a barrier between posting receives and posting ready sends. "
198 "This should have been checked before. "
199 "Please report this bug to the Tpetra developers.");
200
201 size_t selfReceiveOffset = 0;
202
203 // MPI tag for nonblocking receives and blocking sends in this
204 // method. Some processes might take the "fast" path
205 // (getIndicesTo().is_null()) and others might take the "slow" path for
206 // the same doPosts() call, so the path tag must be the same for
207 // both.
208 const int pathTag = 0;
209 const int tag = plan.getTag(pathTag);
210
211#ifdef HAVE_TPETRA_DEBUG
212 TEUCHOS_TEST_FOR_EXCEPTION
213 (requests_.size () != 0,
214 std::logic_error,
215 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
216 << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
217#endif // HAVE_TPETRA_DEBUG
218
219 // Distributor uses requests_.size() as the number of outstanding
220 // nonblocking message requests, so we resize to zero to maintain
221 // this invariant.
222 //
223 // getNumReceives() does _not_ include the self message, if there is
224 // one. Here, we do actually send a message to ourselves, so we
225 // include any self message in the "actual" number of receives to
226 // post.
227 //
228 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
229 // doesn't (re)allocate its array of requests. That happens in
230 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
231 // demand), or Resize_().
232 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
233 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
234 requests_.resize (0);
235
236 // Post the nonblocking receives. It's common MPI wisdom to post
237 // receives before sends. In MPI terms, this means favoring
238 // adding to the "posted queue" (of receive requests) over adding
239 // to the "unexpected queue" (of arrived messages not yet matched
240 // with a receive).
241 {
242#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
243 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
244#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
245
246 size_t curBufferOffset = 0;
247 for (size_type i = 0; i < actualNumReceives; ++i) {
248 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
249 if (plan.getProcsFrom()[i] != myRank) {
250 // If my process is receiving these packet(s) from another
251 // process (not a self-receive):
252 //
253 // 1. Set up the persisting view (recvBuf) of the imports
254 // array, given the offset and size (total number of
255 // packets from process getProcsFrom()[i]).
256 // 2. Start the Irecv and save the resulting request.
257 TEUCHOS_TEST_FOR_EXCEPTION(
258 curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
259 std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
260 "Exceeded size of 'imports' array in packing loop on Process " <<
261 myRank << ". imports.size() = " << imports.size () << " < "
262 "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
263 curBufLen << ").");
264 imports_view_type recvBuf =
265 subview_offset (imports, curBufferOffset, curBufLen);
266 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
267 tag, *plan.getComm()));
268 }
269 else { // Receiving from myself
270 selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
271 }
272 curBufferOffset += curBufLen;
273 }
274 }
275
276 if (doBarrier) {
277#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
278 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3KV_barrier_);
279#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
280
281 // If we are using ready sends (MPI_Rsend) below, we need to do
282 // a barrier before we post the ready sends. This is because a
283 // ready send requires that its matching receive has already
284 // been posted before the send has been posted. The only way to
285 // guarantee that in this case is to use a barrier.
286 plan.getComm()->barrier ();
287 }
288
289#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
290 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
291#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
292
293 // setup scan through getProcsTo() list starting with higher numbered procs
294 // (should help balance message traffic)
295 //
296 // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
297 // It doesn't depend on the input at all.
298 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
299 size_t procIndex = 0;
300 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
301 ++procIndex;
302 }
303 if (procIndex == numBlocks) {
304 procIndex = 0;
305 }
306
307 size_t selfNum = 0;
308 size_t selfIndex = 0;
309
310 if (plan.getIndicesTo().is_null()) {
311
312#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
313 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
314#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
315
316 // Data are already blocked (laid out) by process, so we don't
317 // need a separate send buffer (besides the exports array).
318 for (size_t i = 0; i < numBlocks; ++i) {
319 size_t p = i + procIndex;
320 if (p > (numBlocks - 1)) {
321 p -= numBlocks;
322 }
323
324 if (plan.getProcsTo()[p] != myRank) {
325 exports_view_type tmpSend = subview_offset(
326 exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
327
328 if (sendType == Details::DISTRIBUTOR_SEND) {
329 send<int> (tmpSend,
330 as<int> (tmpSend.size ()),
331 plan.getProcsTo()[p], tag, *plan.getComm());
332 }
333 else if (sendType == Details::DISTRIBUTOR_ISEND) {
334 exports_view_type tmpSendBuf =
335 subview_offset (exports, plan.getStartsTo()[p] * numPackets,
336 plan.getLengthsTo()[p] * numPackets);
337 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
338 tag, *plan.getComm()));
339 }
340 else if (sendType == Details::DISTRIBUTOR_RSEND) {
341 readySend<int> (tmpSend,
342 as<int> (tmpSend.size ()),
343 plan.getProcsTo()[p], tag, *plan.getComm());
344 }
345 else if (sendType == Details::DISTRIBUTOR_SSEND) {
346 ssend<int> (tmpSend,
347 as<int> (tmpSend.size ()),
348 plan.getProcsTo()[p], tag, *plan.getComm());
349 } else {
350 TEUCHOS_TEST_FOR_EXCEPTION(
351 true,
352 std::logic_error,
353 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
354 "Invalid send type. We should never get here. "
355 "Please report this bug to the Tpetra developers.");
356 }
357 }
358 else { // "Sending" the message to myself
359 selfNum = p;
360 }
361 }
362
363 if (plan.hasSelfMessage()) {
364 // This is how we "send a message to ourself": we copy from
365 // the export buffer to the import buffer. That saves
366 // Teuchos::Comm implementations other than MpiComm (in
367 // particular, SerialComm) the trouble of implementing self
368 // messages correctly. (To do this right, SerialComm would
369 // need internal buffer space for messages, keyed on the
370 // message's tag.)
371 deep_copy_offset(imports, exports, selfReceiveOffset,
372 plan.getStartsTo()[selfNum]*numPackets,
373 plan.getLengthsTo()[selfNum]*numPackets);
374 }
375 }
376 else { // data are not blocked by proc, use send buffer
377
378#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
379 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
380#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
381
382 typedef typename ExpView::non_const_value_type Packet;
383 typedef typename ExpView::array_layout Layout;
384 typedef typename ExpView::device_type Device;
385 typedef typename ExpView::memory_traits Mem;
386 Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
387 plan.getMaxSendLength() * numPackets);
388
389 // FIXME (mfh 05 Mar 2013) This is broken for Isend (nonblocking
390 // sends), because the buffer is only long enough for one send.
391 TEUCHOS_TEST_FOR_EXCEPTION(
392 sendType == Details::DISTRIBUTOR_ISEND,
393 std::logic_error,
394 "Tpetra::Distributor::doPosts(3 args, Kokkos): The \"send buffer\" code path "
395 "doesn't currently work with nonblocking sends.");
396
397 for (size_t i = 0; i < numBlocks; ++i) {
398 size_t p = i + procIndex;
399 if (p > (numBlocks - 1)) {
400 p -= numBlocks;
401 }
402
403 if (plan.getProcsTo()[p] != myRank) {
404 size_t sendArrayOffset = 0;
405 size_t j = plan.getStartsTo()[p];
406 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
407 deep_copy_offset(sendArray, exports, sendArrayOffset,
408 plan.getIndicesTo()[j]*numPackets, numPackets);
409 sendArrayOffset += numPackets;
410 }
411 ImpView tmpSend =
412 subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
413
414 if (sendType == Details::DISTRIBUTOR_SEND) {
415 send<int> (tmpSend,
416 as<int> (tmpSend.size ()),
417 plan.getProcsTo()[p], tag, *plan.getComm());
418 }
419 else if (sendType == Details::DISTRIBUTOR_ISEND) {
420 exports_view_type tmpSendBuf =
421 subview_offset (sendArray, size_t(0), plan.getLengthsTo()[p] * numPackets);
422 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
423 tag, *plan.getComm()));
424 }
425 else if (sendType == Details::DISTRIBUTOR_RSEND) {
426 readySend<int> (tmpSend,
427 as<int> (tmpSend.size ()),
428 plan.getProcsTo()[p], tag, *plan.getComm());
429 }
430 else if (sendType == Details::DISTRIBUTOR_SSEND) {
431 ssend<int> (tmpSend,
432 as<int> (tmpSend.size ()),
433 plan.getProcsTo()[p], tag, *plan.getComm());
434 }
435 else {
436 TEUCHOS_TEST_FOR_EXCEPTION(
437 true,
438 std::logic_error,
439 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
440 "Invalid send type. We should never get here. "
441 "Please report this bug to the Tpetra developers.");
442 }
443 }
444 else { // "Sending" the message to myself
445 selfNum = p;
446 selfIndex = plan.getStartsTo()[p];
447 }
448 }
449
450 if (plan.hasSelfMessage()) {
451 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
452 deep_copy_offset(imports, exports, selfReceiveOffset,
453 plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
454 ++selfIndex;
455 selfReceiveOffset += numPackets;
456 }
457 }
458 }
459}
460
461template <class ExpView, class ImpView>
462void DistributorActor::doPosts(const DistributorPlan& plan,
463 const ExpView &exports,
464 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
465 const ImpView &imports,
466 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
467{
468 static_assert(areKokkosViews<ExpView, ImpView>,
469 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
470 using Teuchos::Array;
471 using Teuchos::as;
472 using Teuchos::ireceive;
473 using Teuchos::isend;
474 using Teuchos::readySend;
475 using Teuchos::send;
476 using Teuchos::ssend;
477 using Teuchos::TypeNameTraits;
478 using std::endl;
479 using Kokkos::Compat::create_const_view;
480 using Kokkos::Compat::create_view;
481 using Kokkos::Compat::subview_offset;
482 using Kokkos::Compat::deep_copy_offset;
483 typedef Array<size_t>::size_type size_type;
484 typedef ExpView exports_view_type;
485 typedef ImpView imports_view_type;
486
487#ifdef KOKKOS_ENABLE_CUDA
488 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
489 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
490 "Please do not use Tpetra::Distributor with UVM "
491 "allocations. See GitHub issue #1088.");
492#endif // KOKKOS_ENABLE_CUDA
493
494#ifdef KOKKOS_ENABLE_SYCL
495 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
496 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
497 "Please do not use Tpetra::Distributor with SharedUSM "
498 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
499#endif // KOKKOS_ENABLE_SYCL
500
501#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
502 Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
503#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
504
505 // Run-time configurable parameters that come from the input
506 // ParameterList set by setParameterList().
507 const Details::EDistributorSendType sendType = plan.getSendType();
508 const bool doBarrier = plan.barrierBetweenRecvSend();
509
510 TEUCHOS_TEST_FOR_EXCEPTION(
511 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
512 std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): Ready-send "
513 "version requires a barrier between posting receives and posting ready "
514 "sends. This should have been checked before. "
515 "Please report this bug to the Tpetra developers.");
516
517 const int myProcID = plan.getComm()->getRank ();
518 size_t selfReceiveOffset = 0;
519
520#ifdef HAVE_TEUCHOS_DEBUG
521 // Different messages may have different numbers of packets.
522 size_t totalNumImportPackets = 0;
523 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
524 totalNumImportPackets += numImportPacketsPerLID[ii];
525 }
526 TEUCHOS_TEST_FOR_EXCEPTION(
527 imports.extent (0) < totalNumImportPackets, std::runtime_error,
528 "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
529 "enough entries to hold the expected number of import packets. "
530 "imports.extent(0) = " << imports.extent (0) << " < "
531 "totalNumImportPackets = " << totalNumImportPackets << ".");
532#endif // HAVE_TEUCHOS_DEBUG
533
534 // MPI tag for nonblocking receives and blocking sends in this
535 // method. Some processes might take the "fast" path
536 // (plan.getIndicesTo().is_null()) and others might take the "slow" path for
537 // the same doPosts() call, so the path tag must be the same for
538 // both.
539 const int pathTag = 1;
540 const int tag = plan.getTag(pathTag);
541
542#ifdef HAVE_TEUCHOS_DEBUG
543 TEUCHOS_TEST_FOR_EXCEPTION
544 (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
545 "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
546 << requests_.size () << " != 0.");
547#endif // HAVE_TEUCHOS_DEBUG
548 // Distributor uses requests_.size() as the number of outstanding
549 // nonblocking message requests, so we resize to zero to maintain
550 // this invariant.
551 //
552 // getNumReceives() does _not_ include the self message, if there is
553 // one. Here, we do actually send a message to ourselves, so we
554 // include any self message in the "actual" number of receives to
555 // post.
556 //
557 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
558 // doesn't (re)allocate its array of requests. That happens in
559 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
560 // demand), or Resize_().
561 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
562 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
563 requests_.resize (0);
564
565 // Post the nonblocking receives. It's common MPI wisdom to post
566 // receives before sends. In MPI terms, this means favoring
567 // adding to the "posted queue" (of receive requests) over adding
568 // to the "unexpected queue" (of arrived messages not yet matched
569 // with a receive).
570 {
571#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
572 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
573#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
574
575 size_t curBufferOffset = 0;
576 size_t curLIDoffset = 0;
577 for (size_type i = 0; i < actualNumReceives; ++i) {
578 size_t totalPacketsFrom_i = 0;
579 for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
580 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
581 }
582 curLIDoffset += plan.getLengthsFrom()[i];
583 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
584 // If my process is receiving these packet(s) from another
585 // process (not a self-receive), and if there is at least
586 // one packet to receive:
587 //
588 // 1. Set up the persisting view (recvBuf) into the imports
589 // array, given the offset and size (total number of
590 // packets from process getProcsFrom()[i]).
591 // 2. Start the Irecv and save the resulting request.
592 imports_view_type recvBuf =
593 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
594 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
595 tag, *plan.getComm()));
596 }
597 else { // Receiving these packet(s) from myself
598 selfReceiveOffset = curBufferOffset; // Remember the offset
599 }
600 curBufferOffset += totalPacketsFrom_i;
601 }
602 }
603
604 if (doBarrier) {
605#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
606 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4KV_barrier_);
607#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
608 // If we are using ready sends (MPI_Rsend) below, we need to do
609 // a barrier before we post the ready sends. This is because a
610 // ready send requires that its matching receive has already
611 // been posted before the send has been posted. The only way to
612 // guarantee that in this case is to use a barrier.
613 plan.getComm()->barrier ();
614 }
615
616#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
617 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
618#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
619
620 // setup arrays containing starting-offsets into exports for each send,
621 // and num-packets-to-send for each send.
622 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
623 size_t maxNumPackets = 0;
624 size_t curPKToffset = 0;
625 for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
626 sendPacketOffsets[pp] = curPKToffset;
627 size_t numPackets = 0;
628 for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
629 numPackets += numExportPacketsPerLID[j];
630 }
631 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
632 packetsPerSend[pp] = numPackets;
633 curPKToffset += numPackets;
634 }
635
636 // setup scan through getProcsTo() list starting with higher numbered procs
637 // (should help balance message traffic)
638 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
639 size_t procIndex = 0;
640 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
641 ++procIndex;
642 }
643 if (procIndex == numBlocks) {
644 procIndex = 0;
645 }
646
647 size_t selfNum = 0;
648 size_t selfIndex = 0;
649 if (plan.getIndicesTo().is_null()) {
650
651#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
652 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
653#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
654
655 // Data are already blocked (laid out) by process, so we don't
656 // need a separate send buffer (besides the exports array).
657 for (size_t i = 0; i < numBlocks; ++i) {
658 size_t p = i + procIndex;
659 if (p > (numBlocks - 1)) {
660 p -= numBlocks;
661 }
662
663 if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
664 exports_view_type tmpSend =
665 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
666
667 if (sendType == Details::DISTRIBUTOR_SEND) { // the default, so put it first
668 send<int> (tmpSend,
669 as<int> (tmpSend.size ()),
670 plan.getProcsTo()[p], tag, *plan.getComm());
671 }
672 else if (sendType == Details::DISTRIBUTOR_RSEND) {
673 readySend<int> (tmpSend,
674 as<int> (tmpSend.size ()),
675 plan.getProcsTo()[p], tag, *plan.getComm());
676 }
677 else if (sendType == Details::DISTRIBUTOR_ISEND) {
678 exports_view_type tmpSendBuf =
679 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
680 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
681 tag, *plan.getComm()));
682 }
683 else if (sendType == Details::DISTRIBUTOR_SSEND) {
684 ssend<int> (tmpSend,
685 as<int> (tmpSend.size ()),
686 plan.getProcsTo()[p], tag, *plan.getComm());
687 }
688 else {
689 TEUCHOS_TEST_FOR_EXCEPTION(
690 true, std::logic_error,
691 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
692 "Invalid send type. We should never get here. "
693 "Please report this bug to the Tpetra developers.");
694 }
695 }
696 else { // "Sending" the message to myself
697 selfNum = p;
698 }
699 }
700
701 if (plan.hasSelfMessage()) {
702 deep_copy_offset(imports, exports, selfReceiveOffset,
703 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
704 }
705 }
706 else { // data are not blocked by proc, use send buffer
707
708#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
709 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
710#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
711
712 // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
713 typedef typename ExpView::non_const_value_type Packet;
714 typedef typename ExpView::array_layout Layout;
715 typedef typename ExpView::device_type Device;
716 typedef typename ExpView::memory_traits Mem;
717 Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray", maxNumPackets); // send buffer
718
719 TEUCHOS_TEST_FOR_EXCEPTION(
720 sendType == Details::DISTRIBUTOR_ISEND,
721 std::logic_error,
722 "Tpetra::Distributor::doPosts(4-arg, Kokkos): "
723 "The \"send buffer\" code path may not necessarily work with nonblocking sends.");
724
725 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
726 size_t ioffset = 0;
727 for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
728 indicesOffsets[j] = ioffset;
729 ioffset += numExportPacketsPerLID[j];
730 }
731
732 for (size_t i = 0; i < numBlocks; ++i) {
733 size_t p = i + procIndex;
734 if (p > (numBlocks - 1)) {
735 p -= numBlocks;
736 }
737
738 if (plan.getProcsTo()[p] != myProcID) {
739 size_t sendArrayOffset = 0;
740 size_t j = plan.getStartsTo()[p];
741 size_t numPacketsTo_p = 0;
742 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
743 numPacketsTo_p += numExportPacketsPerLID[j];
744 deep_copy_offset(sendArray, exports, sendArrayOffset,
745 indicesOffsets[j], numExportPacketsPerLID[j]);
746 sendArrayOffset += numExportPacketsPerLID[j];
747 }
748 if (numPacketsTo_p > 0) {
749 ImpView tmpSend =
750 subview_offset(sendArray, size_t(0), numPacketsTo_p);
751
752 if (sendType == Details::DISTRIBUTOR_RSEND) {
753 readySend<int> (tmpSend,
754 as<int> (tmpSend.size ()),
755 plan.getProcsTo()[p], tag, *plan.getComm());
756 }
757 else if (sendType == Details::DISTRIBUTOR_ISEND) {
758 exports_view_type tmpSendBuf =
759 subview_offset (sendArray, size_t(0), numPacketsTo_p);
760 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
761 tag, *plan.getComm()));
762 }
763 else if (sendType == Details::DISTRIBUTOR_SSEND) {
764 ssend<int> (tmpSend,
765 as<int> (tmpSend.size ()),
766 plan.getProcsTo()[p], tag, *plan.getComm());
767 }
768 else { // if (sendType == Details::DISTRIBUTOR_SSEND)
769 send<int> (tmpSend,
770 as<int> (tmpSend.size ()),
771 plan.getProcsTo()[p], tag, *plan.getComm());
772 }
773 }
774 }
775 else { // "Sending" the message to myself
776 selfNum = p;
777 selfIndex = plan.getStartsTo()[p];
778 }
779 }
780
781 if (plan.hasSelfMessage()) {
782 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
783 deep_copy_offset(imports, exports, selfReceiveOffset,
784 indicesOffsets[selfIndex],
785 numExportPacketsPerLID[selfIndex]);
786 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
787 ++selfIndex;
788 }
789 }
790 }
791}
792
793}
794}
795
796#endif
Stand-alone utility functions and macros.
Implementation details of Tpetra.
EDistributorSendType
The type of MPI send that Distributor should use.
Namespace Tpetra contains the class and methods constituting the Tpetra library.