Kokkos Core Kernels Package Version of the Day
Kokkos_HPX.hpp
1/*
2//@HEADER
3// ************************************************************************
4//
5// Kokkos v. 3.0
6// Copyright (2020) National Technology & Engineering
7// Solutions of Sandia, LLC (NTESS).
8//
9// Under the terms of Contract DE-NA0003525 with NTESS,
10// the U.S. Government retains certain rights in this software.
11//
12// Redistribution and use in source and binary forms, with or without
13// modification, are permitted provided that the following conditions are
14// met:
15//
16// 1. Redistributions of source code must retain the above copyright
17// notice, this list of conditions and the following disclaimer.
18//
19// 2. Redistributions in binary form must reproduce the above copyright
20// notice, this list of conditions and the following disclaimer in the
21// documentation and/or other materials provided with the distribution.
22//
23// 3. Neither the name of the Corporation nor the names of the
24// contributors may be used to endorse or promote products derived from
25// this software without specific prior written permission.
26//
27// THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY
28// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE
31// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38//
39// Questions? Contact Christian R. Trott (crtrott@sandia.gov)
40//
41// ************************************************************************
42//@HEADER
43*/
44
45#ifndef KOKKOS_HPX_HPP
46#define KOKKOS_HPX_HPP
47
48#include <Kokkos_Macros.hpp>
49#if defined(KOKKOS_ENABLE_HPX)
50
51#include <Kokkos_Core_fwd.hpp>
52
53#include <Kokkos_HostSpace.hpp>
54#include <cstddef>
55#include <iosfwd>
56
57#ifdef KOKKOS_ENABLE_HBWSPACE
58#include <Kokkos_HBWSpace.hpp>
59#endif
60
61#include <HPX/Kokkos_HPX_ChunkedRoundRobinExecutor.hpp>
62#include <Kokkos_HostSpace.hpp>
63#include <Kokkos_Layout.hpp>
64#include <Kokkos_MemoryTraits.hpp>
65#include <Kokkos_Parallel.hpp>
66#include <Kokkos_ScratchSpace.hpp>
67#include <Kokkos_TaskScheduler.hpp>
68#include <impl/Kokkos_ConcurrentBitset.hpp>
69#include <impl/Kokkos_FunctorAdapter.hpp>
70#include <impl/Kokkos_FunctorAnalysis.hpp>
71#include <impl/Kokkos_Tools.hpp>
72#include <impl/Kokkos_Tags.hpp>
73#include <impl/Kokkos_TaskQueue.hpp>
74#include <impl/Kokkos_ExecSpaceInitializer.hpp>
75
76#include <KokkosExp_MDRangePolicy.hpp>
77
78#include <hpx/apply.hpp>
79#include <hpx/hpx_start.hpp>
80#include <hpx/include/util.hpp>
81#include <hpx/lcos/local/barrier.hpp>
82#include <hpx/lcos/local/latch.hpp>
83#include <hpx/parallel/algorithms/for_loop.hpp>
84#include <hpx/parallel/algorithms/reduce.hpp>
85#include <hpx/parallel/executors/static_chunk_size.hpp>
86#include <hpx/runtime.hpp>
87#include <hpx/runtime/threads/run_as_hpx_thread.hpp>
88#include <hpx/runtime/threads/threadmanager.hpp>
89#include <hpx/runtime/thread_pool_helpers.hpp>
90
91#include <Kokkos_UniqueToken.hpp>
92
93#include <functional>
94#include <iostream>
95#include <memory>
96#include <sstream>
97#include <stdexcept>
98#include <type_traits>
99#include <vector>
100
101// There are currently two different implementations for the parallel dispatch
102// functions:
103//
104// - 0: The HPX way. Unfortunately, this comes with unnecessary
105// overheads at the moment, so there is
106// - 1: The manual way. This way is more verbose and does not take advantage of
107// e.g. parallel::for_loop in HPX but it is significantly faster in many
108// benchmarks.
109// - 2: Like 1, but spawn tasks using for_loop and a custom executor.
110//
111// In the long run 0 should be the preferred implementation, but until HPX is
112// improved 1 will be the default.
113#ifndef KOKKOS_HPX_IMPLEMENTATION
114#define KOKKOS_HPX_IMPLEMENTATION 1
115#endif
116
117#if (KOKKOS_HPX_IMPLEMENTATION < 0) || (KOKKOS_HPX_IMPLEMENTATION > 2)
118#error "You have chosen an invalid value for KOKKOS_HPX_IMPLEMENTATION"
119#endif
120
121// [note 1]
122//
123// When using the asynchronous backend and independent instances, we explicitly
124// reset the shared data at the end of a parallel task (execute_task). We do
125// this to avoid circular references with shared pointers that would otherwise
126// never be released.
127//
128// The HPX instance holds shared data for the instance in a shared_ptr. One of
129// the pieces of shared data is the future that we use to sequence parallel
130// dispatches. When a parallel task is launched, a copy of the closure
131// (ParallelFor, ParallelReduce, etc.) is captured in the task. The closure
132// also holds the policy, the policy holds the HPX instance, the instance holds
133// the shared data (for use of buffers in the parallel task). When attaching a
134// continuation to a future, the continuation is stored in the future (shared
135// state). This means that there is a cycle future -> continuation -> closure
136// -> policy -> HPX -> shared data -> future. We break this by releasing the
137// shared data early, as (the pointer to) the shared data will not be used
138// anymore by the closure at the end of execute_task.
139//
140// We also mark the shared instance data as mutable so that we can reset it
141// from the const execute_task member function.
142
143namespace Kokkos {
144namespace Impl {
145class thread_buffer {
146 static constexpr std::size_t m_cache_line_size = 64;
147
148 std::size_t m_num_threads;
149 std::size_t m_size_per_thread;
150 std::size_t m_size_total;
151 char *m_data;
152
153 void pad_to_cache_line(std::size_t &size) {
154 size = ((size + m_cache_line_size - 1) / m_cache_line_size) *
155 m_cache_line_size;
156 }
157
158 public:
159 thread_buffer()
160 : m_num_threads(0),
161 m_size_per_thread(0),
162 m_size_total(0),
163 m_data(nullptr) {}
164 thread_buffer(const std::size_t num_threads,
165 const std::size_t size_per_thread) {
166 resize(num_threads, size_per_thread);
167 }
168 ~thread_buffer() { delete[] m_data; }
169
170 thread_buffer(const thread_buffer &) = delete;
171 thread_buffer(thread_buffer &&) = delete;
172 thread_buffer &operator=(const thread_buffer &) = delete;
173 thread_buffer &operator=(thread_buffer) = delete;
174
175 void resize(const std::size_t num_threads,
176 const std::size_t size_per_thread) {
177 m_num_threads = num_threads;
178 m_size_per_thread = size_per_thread;
179
180 pad_to_cache_line(m_size_per_thread);
181
182 std::size_t size_total_new = m_num_threads * m_size_per_thread;
183
184 if (m_size_total < size_total_new) {
185 delete[] m_data;
186 m_data = new char[size_total_new];
187 m_size_total = size_total_new;
188 }
189 }
190
191 char *get(std::size_t thread_num) {
192 assert(thread_num < m_num_threads);
193 if (m_data == nullptr) {
194 return nullptr;
195 }
196 return &m_data[thread_num * m_size_per_thread];
197 }
198
199 std::size_t size_per_thread() const noexcept { return m_size_per_thread; }
200 std::size_t size_total() const noexcept { return m_size_total; }
201};
202} // namespace Impl
203
204namespace Experimental {
205class HPX {
206 private:
207 static bool m_hpx_initialized;
208 static std::atomic<uint32_t> m_next_instance_id;
209 uint32_t m_instance_id = 0;
210
211#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
212 public:
213 enum class instance_mode { global, independent };
214 instance_mode m_mode;
215
216 private:
217 static std::atomic<uint32_t> m_active_parallel_region_count;
218
219 struct instance_data {
220 instance_data() = default;
221 instance_data(hpx::shared_future<void> future) : m_future(future) {}
222 Kokkos::Impl::thread_buffer m_buffer;
223 hpx::shared_future<void> m_future = hpx::make_ready_future<void>();
224 };
225
226 mutable std::shared_ptr<instance_data> m_independent_instance_data;
227 static instance_data m_global_instance_data;
228
229 std::reference_wrapper<Kokkos::Impl::thread_buffer> m_buffer;
230 std::reference_wrapper<hpx::shared_future<void>> m_future;
231#else
232 static Kokkos::Impl::thread_buffer m_global_buffer;
233#endif
234
235 public:
236 using execution_space = HPX;
237 using memory_space = HostSpace;
238 using device_type = Kokkos::Device<execution_space, memory_space>;
239 using array_layout = LayoutRight;
240 using size_type = memory_space::size_type;
241 using scratch_memory_space = ScratchMemorySpace<HPX>;
242
243#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
244 HPX()
245 noexcept
246 : m_instance_id(0),
247 m_mode(instance_mode::global),
248 m_buffer(m_global_instance_data.m_buffer),
249 m_future(m_global_instance_data.m_future) {}
250
251 HPX(instance_mode mode)
252 : m_instance_id(mode == instance_mode::independent ? m_next_instance_id++
253 : 0),
254 m_mode(mode),
255 m_independent_instance_data(mode == instance_mode::independent
256 ? (new instance_data())
257 : nullptr),
258 m_buffer(mode == instance_mode::independent
259 ? m_independent_instance_data->m_buffer
260 : m_global_instance_data.m_buffer),
261 m_future(mode == instance_mode::independent
262 ? m_independent_instance_data->m_future
263 : m_global_instance_data.m_future) {}
264
265 HPX(hpx::shared_future<void> future)
266 : m_instance_id(m_next_instance_id++),
267 m_mode(instance_mode::independent),
268
269 m_independent_instance_data(new instance_data(future)),
270 m_buffer(m_independent_instance_data->m_buffer),
271 m_future(m_independent_instance_data->m_future) {}
272
273 HPX(const HPX &other)
274 : m_instance_id(other.m_instance_id),
275 m_mode(other.m_mode),
276 m_independent_instance_data(other.m_independent_instance_data),
277 m_buffer(other.m_buffer),
278 m_future(other.m_future) {}
279
280 HPX &operator=(const HPX &other) {
281 m_instance_id =
282 other.m_mode == instance_mode::independent ? m_next_instance_id++ : 0;
283 m_mode = other.m_mode;
284 m_independent_instance_data = other.m_independent_instance_data;
285 m_buffer = m_mode == instance_mode::independent
286 ? m_independent_instance_data->m_buffer
287 : m_global_instance_data.m_buffer;
288 m_future = m_mode == instance_mode::independent
289 ? m_independent_instance_data->m_future
290 : m_global_instance_data.m_future;
291 return *this;
292 }
293#else
294 HPX() noexcept {}
295#endif
296
297 static void print_configuration(std::ostream &,
298 const bool /* verbose */ = false) {
299 std::cout << "HPX backend" << std::endl;
300 }
301 uint32_t impl_instance_id() const noexcept { return m_instance_id; }
302
303#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
304 static bool in_parallel(HPX const &instance = HPX()) noexcept {
305 return !instance.impl_get_future().is_ready();
306 }
307#else
308 static bool in_parallel(HPX const & = HPX()) noexcept { return false; }
309#endif
310
311#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
312 static void impl_decrement_active_parallel_region_count() {
313 --m_active_parallel_region_count;
314 }
315
316 static void impl_increment_active_parallel_region_count() {
317 ++m_active_parallel_region_count;
318 }
319
320 void impl_fence_instance() const {
321 if (hpx::threads::get_self_ptr() == nullptr) {
322 hpx::threads::run_as_hpx_thread([this]() { impl_get_future().wait(); });
323 } else {
324 impl_get_future().wait();
325 }
326 }
327
328 void impl_fence_all_instances() const {
329 hpx::util::yield_while(
330 []() { return m_active_parallel_region_count.load() != 0; });
331 }
332#endif
333
334 void fence() const {
335#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
336 if (m_mode == instance_mode::global) {
337 impl_fence_all_instances();
338 } else {
339 impl_fence_instance();
340 }
341#endif
342 }
343
344 static bool is_asynchronous(HPX const & = HPX()) noexcept {
345#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
346 return true;
347#else
348 return false;
349#endif
350 }
351
352 static std::vector<HPX> partition(...) {
353 Kokkos::abort(
354 "Kokkos::Experimental::HPX::partition_master: can't partition an HPX "
355 "instance\n");
356 return std::vector<HPX>();
357 }
358
359 template <typename F>
360 static void partition_master(F const &, int requested_num_partitions = 0,
361 int = 0) {
362 if (requested_num_partitions > 1) {
363 Kokkos::abort(
364 "Kokkos::Experimental::HPX::partition_master: can't partition an "
365 "HPX instance\n");
366 }
367 }
368
369 static int concurrency();
370 static void impl_initialize(int thread_count);
371 static void impl_initialize();
372 static bool impl_is_initialized() noexcept;
373 static void impl_finalize();
374
375 static int impl_thread_pool_size() noexcept {
376 hpx::runtime *rt = hpx::get_runtime_ptr();
377 if (rt == nullptr) {
378 return 0;
379 } else {
380 if (hpx::threads::get_self_ptr() == nullptr) {
381 return hpx::resource::get_thread_pool(0).get_os_thread_count();
382 } else {
383 return hpx::this_thread::get_pool()->get_os_thread_count();
384 }
385 }
386 }
387
388 static int impl_thread_pool_rank() noexcept {
389 hpx::runtime *rt = hpx::get_runtime_ptr();
390 if (rt == nullptr) {
391 return 0;
392 } else {
393 if (hpx::threads::get_self_ptr() == nullptr) {
394 return 0;
395 } else {
396 return hpx::this_thread::get_pool()->get_pool_index();
397 }
398 }
399 }
400
401 static int impl_thread_pool_size(int depth) {
402 if (depth == 0) {
403 return impl_thread_pool_size();
404 } else {
405 return 1;
406 }
407 }
408
409 static int impl_max_hardware_threads() noexcept {
410 return hpx::threads::hardware_concurrency();
411 }
412
413 static int impl_hardware_thread_id() noexcept {
414 return hpx::get_worker_thread_num();
415 }
416
417 Kokkos::Impl::thread_buffer &impl_get_buffer() const noexcept {
418#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
419 return m_buffer.get();
420#else
421 return m_global_buffer;
422#endif
423 }
424
425#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
426 hpx::shared_future<void> &impl_get_future() const noexcept {
427 return m_future;
428 }
429#endif
430
431#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
432 struct KOKKOS_ATTRIBUTE_NODISCARD reset_on_exit_parallel {
433 HPX const &m_space;
434 reset_on_exit_parallel(HPX const &space) : m_space(space) {}
435 ~reset_on_exit_parallel() {
436 // See [note 1] for an explanation. m_independent_instance_data is
437 // marked mutable.
438 m_space.m_independent_instance_data.reset();
439
440 HPX::impl_decrement_active_parallel_region_count();
441 }
442 };
443#endif
444
445 static constexpr const char *name() noexcept { return "HPX"; }
446};
447} // namespace Experimental
448
449namespace Tools {
450namespace Experimental {
451template <>
452struct DeviceTypeTraits<Kokkos::Experimental::HPX> {
453 static constexpr DeviceType id = DeviceType::HPX;
454};
455} // namespace Experimental
456} // namespace Tools
457
458namespace Impl {
459
460class HPXSpaceInitializer : public ExecSpaceInitializerBase {
461 public:
462 HPXSpaceInitializer() = default;
463 ~HPXSpaceInitializer() = default;
464 void initialize(const InitArguments &args) final;
465 void finalize(const bool) final;
466 void fence() final;
467 void print_configuration(std::ostream &msg, const bool detail) final;
468};
469
470#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
471template <typename Closure>
472inline void dispatch_execute_task(Closure *closure,
473 Kokkos::Experimental::HPX const &instance,
474 bool force_synchronous = false) {
475 Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
476
477 if (hpx::threads::get_self_ptr() == nullptr) {
478 hpx::threads::run_as_hpx_thread([closure, &instance]() {
479 hpx::shared_future<void> &fut = instance.impl_get_future();
480 Closure closure_copy = *closure;
481 fut = fut.then([closure_copy](hpx::shared_future<void> &&) {
482 closure_copy.execute_task();
483 });
484 });
485 } else {
486 hpx::shared_future<void> &fut = instance.impl_get_future();
487 Closure closure_copy = *closure;
488 fut = fut.then([closure_copy](hpx::shared_future<void> &&) {
489 closure_copy.execute_task();
490 });
491 }
492
493 if (force_synchronous) {
494 instance.fence();
495 }
496}
497#else
498template <typename Closure>
499inline void dispatch_execute_task(Closure *closure,
500 Kokkos::Experimental::HPX const &,
501 bool = false) {
502#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
503 Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
504#endif
505
506 if (hpx::threads::get_self_ptr() == nullptr) {
507 hpx::threads::run_as_hpx_thread([closure]() { closure->execute_task(); });
508 } else {
509 closure->execute_task();
510 }
511}
512#endif
513} // namespace Impl
514} // namespace Kokkos
515
516namespace Kokkos {
517namespace Impl {
518template <>
519struct MemorySpaceAccess<Kokkos::Experimental::HPX::memory_space,
520 Kokkos::Experimental::HPX::scratch_memory_space> {
521 enum : bool { assignable = false };
522 enum : bool { accessible = true };
523 enum : bool { deepcopy = false };
524};
525
526} // namespace Impl
527} // namespace Kokkos
528
529namespace Kokkos {
530namespace Experimental {
531template <>
532class UniqueToken<HPX, UniqueTokenScope::Instance> {
533 private:
535 int m_count;
536 buffer_type m_buffer_view;
537 uint32_t volatile *m_buffer;
538
539 public:
540 using execution_space = HPX;
541 using size_type = int;
542
546 UniqueToken(execution_space const & = execution_space()) noexcept
547 : m_count(execution_space::impl_max_hardware_threads()),
548 m_buffer_view(buffer_type()),
549 m_buffer(nullptr) {}
550
551 UniqueToken(size_type max_size, execution_space const & = execution_space())
552 : m_count(max_size > execution_space::impl_max_hardware_threads()
553 ? execution_space::impl_max_hardware_threads()
554 : max_size),
555 m_buffer_view(
556 max_size > execution_space::impl_max_hardware_threads()
557 ? buffer_type()
558 : buffer_type("UniqueToken::m_buffer_view",
559 ::Kokkos::Impl::concurrent_bitset::buffer_bound(
560 m_count))),
561 m_buffer(m_buffer_view.data()) {}
562
564 KOKKOS_INLINE_FUNCTION
565 int size() const noexcept { return m_count; }
566
568 KOKKOS_INLINE_FUNCTION
569 int acquire() const noexcept {
570#if defined(KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST)
571 if (m_buffer == nullptr) {
572 return execution_space::impl_hardware_thread_id();
573 } else {
574 const ::Kokkos::pair<int, int> result =
575 ::Kokkos::Impl::concurrent_bitset::acquire_bounded(
576 m_buffer, m_count, ::Kokkos::Impl::clock_tic() % m_count);
577
578 if (result.first < 0) {
579 ::Kokkos::abort(
580 "UniqueToken<HPX> failure to acquire tokens, no tokens "
581 "available");
582 }
583 return result.first;
584 }
585#else
586 return 0;
587#endif
588 }
589
591 KOKKOS_INLINE_FUNCTION
592 void release(int i) const noexcept {
593#if defined(KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST)
594 if (m_buffer != nullptr) {
595 ::Kokkos::Impl::concurrent_bitset::release(m_buffer, i);
596 }
597#else
598 (void)i;
599#endif
600 }
601};
602
603template <>
604class UniqueToken<HPX, UniqueTokenScope::Global> {
605 public:
606 using execution_space = HPX;
607 using size_type = int;
608 UniqueToken(execution_space const & = execution_space()) noexcept {}
609
610 // NOTE: Currently this assumes that there is no oversubscription.
611 // hpx::get_num_worker_threads can't be used directly because it may yield
612 // it's task (problematic if called after hpx::get_worker_thread_num).
613 int size() const noexcept { return HPX::impl_max_hardware_threads(); }
614 int acquire() const noexcept { return HPX::impl_hardware_thread_id(); }
615 void release(int) const noexcept {}
616};
617} // namespace Experimental
618} // namespace Kokkos
619
620namespace Kokkos {
621namespace Impl {
622
623struct HPXTeamMember {
624 public:
625 using execution_space = Kokkos::Experimental::HPX;
626 using scratch_memory_space =
628
629 private:
630 scratch_memory_space m_team_shared;
631
632 int m_league_size;
633 int m_league_rank;
634 int m_team_size;
635 int m_team_rank;
636
637 public:
638 KOKKOS_INLINE_FUNCTION
639 const scratch_memory_space &team_shmem() const {
640 return m_team_shared.set_team_thread_mode(0, 1, 0);
641 }
642
643 KOKKOS_INLINE_FUNCTION
644 const execution_space::scratch_memory_space &team_scratch(const int) const {
645 return m_team_shared.set_team_thread_mode(0, 1, 0);
646 }
647
648 KOKKOS_INLINE_FUNCTION
649 const execution_space::scratch_memory_space &thread_scratch(const int) const {
650 return m_team_shared.set_team_thread_mode(0, team_size(), team_rank());
651 }
652
653 KOKKOS_INLINE_FUNCTION int league_rank() const noexcept {
654 return m_league_rank;
655 }
656
657 KOKKOS_INLINE_FUNCTION int league_size() const noexcept {
658 return m_league_size;
659 }
660
661 KOKKOS_INLINE_FUNCTION int team_rank() const noexcept { return m_team_rank; }
662 KOKKOS_INLINE_FUNCTION int team_size() const noexcept { return m_team_size; }
663
664 template <class... Properties>
665 constexpr KOKKOS_INLINE_FUNCTION HPXTeamMember(
666 const TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
667 &policy,
668 const int team_rank, const int league_rank, void *scratch,
669 int scratch_size) noexcept
670 : m_team_shared(scratch, scratch_size, scratch, scratch_size),
671 m_league_size(policy.league_size()),
672 m_league_rank(league_rank),
673 m_team_size(policy.team_size()),
674 m_team_rank(team_rank) {}
675
676 KOKKOS_INLINE_FUNCTION
677 void team_barrier() const {}
678
679 template <class ValueType>
680 KOKKOS_INLINE_FUNCTION void team_broadcast(ValueType &, const int &) const {
681 static_assert(std::is_trivially_default_constructible<ValueType>(),
682 "Only trivial constructible types can be broadcasted");
683 }
684
685 template <class Closure, class ValueType>
686 KOKKOS_INLINE_FUNCTION void team_broadcast(const Closure &, ValueType &,
687 const int &) const {
688 static_assert(std::is_trivially_default_constructible<ValueType>(),
689 "Only trivial constructible types can be broadcasted");
690 }
691
692 template <class ValueType, class JoinOp>
693 KOKKOS_INLINE_FUNCTION ValueType team_reduce(const ValueType &value,
694 const JoinOp &) const {
695 return value;
696 }
697
698 template <class ReducerType>
699 KOKKOS_INLINE_FUNCTION
700 typename std::enable_if<is_reducer<ReducerType>::value>::type
701 team_reduce(const ReducerType &) const {}
702
703 template <typename Type>
704 KOKKOS_INLINE_FUNCTION Type
705 team_scan(const Type &value, Type *const global_accum = nullptr) const {
706 if (global_accum) {
707 Kokkos::atomic_fetch_add(global_accum, value);
708 }
709
710 return 0;
711 }
712};
713
714template <class... Properties>
715class TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
716 : public PolicyTraits<Properties...> {
717 using traits = PolicyTraits<Properties...>;
718
719 int m_league_size;
720 int m_team_size;
721 std::size_t m_team_scratch_size[2];
722 std::size_t m_thread_scratch_size[2];
723 int m_chunk_size;
724
725 public:
727 using execution_policy = TeamPolicyInternal;
728
729 using member_type = HPXTeamMember;
730
732 using execution_space = Kokkos::Experimental::HPX;
733
734 // NOTE: Max size is 1 for simplicity. In most cases more than 1 is not
735 // necessary on CPU. Implement later if there is a need.
736 template <class FunctorType>
737 inline static int team_size_max(const FunctorType &) {
738 return 1;
739 }
740
741 template <class FunctorType>
742 inline static int team_size_recommended(const FunctorType &) {
743 return 1;
744 }
745
746 template <class FunctorType>
747 inline static int team_size_recommended(const FunctorType &, const int &) {
748 return 1;
749 }
750
751 template <class FunctorType>
752 int team_size_max(const FunctorType &, const ParallelForTag &) const {
753 return 1;
754 }
755
756 template <class FunctorType>
757 int team_size_max(const FunctorType &, const ParallelReduceTag &) const {
758 return 1;
759 }
760
761 template <class FunctorType, class ReducerType>
762 int team_size_max(const FunctorType &, const ReducerType &,
763 const ParallelReduceTag &) const {
764 return 1;
765 }
766
767 template <class FunctorType>
768 int team_size_recommended(const FunctorType &, const ParallelForTag &) const {
769 return 1;
770 }
771
772 template <class FunctorType>
773 int team_size_recommended(const FunctorType &,
774 const ParallelReduceTag &) const {
775 return 1;
776 }
777
778 template <class FunctorType, class ReducerType>
779 int team_size_recommended(const FunctorType &, const ReducerType &,
780 const ParallelReduceTag &) const {
781 return 1;
782 }
783
784 static int vector_length_max() { return 1; }
785
786 inline int impl_vector_length() noexcept { return 1; }
787 inline bool impl_auto_team_size() noexcept { return false; }
788 inline bool impl_auto_vector_length() noexcept { return false; }
789 inline void impl_set_vector_length(int) noexcept {}
790 inline void impl_set_team_size(int) noexcept {}
791
792 private:
793 inline void init(const int league_size_request, const int team_size_request) {
794 m_league_size = league_size_request;
795 const int max_team_size = 1; // TODO: Can't use team_size_max(...) because
796 // it requires a functor as argument.
797 m_team_size =
798 team_size_request > max_team_size ? max_team_size : team_size_request;
799
800 if (m_chunk_size > 0) {
801 if (!Impl::is_integral_power_of_two(m_chunk_size))
802 Kokkos::abort("TeamPolicy blocking granularity must be power of two");
803 } else {
804 int new_chunk_size = 1;
805 while (new_chunk_size * 4 * Kokkos::Experimental::HPX::concurrency() <
806 m_league_size) {
807 new_chunk_size *= 2;
808 }
809
810 if (new_chunk_size < 128) {
811 new_chunk_size = 1;
812 while ((new_chunk_size * Kokkos::Experimental::HPX::concurrency() <
813 m_league_size) &&
814 (new_chunk_size < 128))
815 new_chunk_size *= 2;
816 }
817
818 m_chunk_size = new_chunk_size;
819 }
820 }
821
822 public:
823 inline int team_size() const { return m_team_size; }
824 inline int league_size() const { return m_league_size; }
825
826 inline size_t scratch_size(const int &level, int team_size_ = -1) const {
827 if (team_size_ < 0) {
828 team_size_ = m_team_size;
829 }
830 return m_team_scratch_size[level] +
831 team_size_ * m_thread_scratch_size[level];
832 }
833
834 inline static int scratch_size_max(int level) {
835 return (level == 0 ? 1024 * 32 : // Roughly L1 size
836 20 * 1024 * 1024); // Limit to keep compatibility with CUDA
837 }
838
839 public:
840 template <class ExecSpace, class... OtherProperties>
841 friend class TeamPolicyInternal;
842
843 const typename traits::execution_space &space() const {
844 static typename traits::execution_space m_space;
845 return m_space;
846 }
847
848 template <class... OtherProperties>
849 TeamPolicyInternal(const TeamPolicyInternal<Kokkos::Experimental::HPX,
850 OtherProperties...> &p) {
851 m_league_size = p.m_league_size;
852 m_team_size = p.m_team_size;
853 m_team_scratch_size[0] = p.m_team_scratch_size[0];
854 m_thread_scratch_size[0] = p.m_thread_scratch_size[0];
855 m_team_scratch_size[1] = p.m_team_scratch_size[1];
856 m_thread_scratch_size[1] = p.m_thread_scratch_size[1];
857 m_chunk_size = p.m_chunk_size;
858 }
859
860 TeamPolicyInternal(const typename traits::execution_space &,
861 int league_size_request, int team_size_request,
862 int /* vector_length_request */ = 1)
863 : m_team_scratch_size{0, 0},
864 m_thread_scratch_size{0, 0},
865 m_chunk_size(0) {
866 init(league_size_request, team_size_request);
867 }
868
869 TeamPolicyInternal(const typename traits::execution_space &,
870 int league_size_request, const Kokkos::AUTO_t &,
871 int /* vector_length_request */ = 1)
872 : m_team_scratch_size{0, 0},
873 m_thread_scratch_size{0, 0},
874 m_chunk_size(0) {
875 init(league_size_request, 1);
876 }
877
878 TeamPolicyInternal(const typename traits::execution_space &space,
879 int league_size_request,
880 const Kokkos::AUTO_t &, /* team_size_request */
881 const Kokkos::AUTO_t & /* vector_length_request */)
882 : m_team_scratch_size{0, 0},
883 m_thread_scratch_size{0, 0},
884 m_chunk_size(0) {
885 init(league_size_request, 1);
886 }
887
888 TeamPolicyInternal(const typename traits::execution_space &space,
889 int league_size_request, int team_size_request,
890 const Kokkos::AUTO_t & /* vector_length_request */
891 )
892 : m_team_scratch_size{0, 0},
893 m_thread_scratch_size{0, 0},
894 m_chunk_size(0) {
895 init(league_size_request, team_size_request);
896 }
897
898 TeamPolicyInternal(int league_size_request,
899 const Kokkos::AUTO_t &, /* team_size_request */
900 const Kokkos::AUTO_t & /* vector_length_request */)
901 : m_team_scratch_size{0, 0},
902 m_thread_scratch_size{0, 0},
903 m_chunk_size(0) {
904 init(league_size_request, 1);
905 }
906
907 TeamPolicyInternal(int league_size_request, int team_size_request,
908 const Kokkos::AUTO_t & /* vector_length_request */
909 )
910 : m_team_scratch_size{0, 0},
911 m_thread_scratch_size{0, 0},
912 m_chunk_size(0) {
913 init(league_size_request, team_size_request);
914 }
915
916 TeamPolicyInternal(int league_size_request, int team_size_request,
917 int /* vector_length_request */ = 1)
918 : m_team_scratch_size{0, 0},
919 m_thread_scratch_size{0, 0},
920 m_chunk_size(0) {
921 init(league_size_request, team_size_request);
922 }
923
924 TeamPolicyInternal(int league_size_request, const Kokkos::AUTO_t &,
925 int /* vector_length_request */ = 1)
926 : m_team_scratch_size{0, 0},
927 m_thread_scratch_size{0, 0},
928 m_chunk_size(0) {
929 init(league_size_request, 1);
930 }
931
932 inline int chunk_size() const { return m_chunk_size; }
933
934 inline TeamPolicyInternal &set_chunk_size(
935 typename traits::index_type chunk_size_) {
936 m_chunk_size = chunk_size_;
937 return *this;
938 }
939
940 inline TeamPolicyInternal &set_scratch_size(const int &level,
941 const PerTeamValue &per_team) {
942 m_team_scratch_size[level] = per_team.value;
943 return *this;
944 }
945
946 inline TeamPolicyInternal &set_scratch_size(
947 const int &level, const PerThreadValue &per_thread) {
948 m_thread_scratch_size[level] = per_thread.value;
949 return *this;
950 }
951
952 inline TeamPolicyInternal &set_scratch_size(
953 const int &level, const PerTeamValue &per_team,
954 const PerThreadValue &per_thread) {
955 m_team_scratch_size[level] = per_team.value;
956 m_thread_scratch_size[level] = per_thread.value;
957 return *this;
958 }
959};
960} // namespace Impl
961} // namespace Kokkos
962
963namespace Kokkos {
964namespace Impl {
965
966template <class FunctorType, class... Traits>
967class ParallelFor<FunctorType, Kokkos::RangePolicy<Traits...>,
968 Kokkos::Experimental::HPX> {
969 private:
970 using Policy = Kokkos::RangePolicy<Traits...>;
971 using WorkTag = typename Policy::work_tag;
972 using WorkRange = typename Policy::WorkRange;
973 using Member = typename Policy::member_type;
974
975 const FunctorType m_functor;
976 const Policy m_policy;
977
978 template <class TagType>
979 static typename std::enable_if<std::is_same<TagType, void>::value>::type
980 execute_functor(const FunctorType &functor, const Member i) {
981 functor(i);
982 }
983
984 template <class TagType>
985 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
986 execute_functor(const FunctorType &functor, const Member i) {
987 const TagType t{};
988 functor(t, i);
989 }
990
991 template <class TagType>
992 static typename std::enable_if<std::is_same<TagType, void>::value>::type
993 execute_functor_range(const FunctorType &functor, const Member i_begin,
994 const Member i_end) {
995 for (Member i = i_begin; i < i_end; ++i) {
996 functor(i);
997 }
998 }
999
1000 template <class TagType>
1001 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
1002 execute_functor_range(const FunctorType &functor, const Member i_begin,
1003 const Member i_end) {
1004 const TagType t{};
1005 for (Member i = i_begin; i < i_end; ++i) {
1006 functor(t, i);
1007 }
1008 }
1009
1010 public:
1011 void execute() const {
1012 Kokkos::Impl::dispatch_execute_task(this, m_policy.space());
1013 }
1014
1015 void execute_task() const {
1016 // See [note 1] for an explanation.
1017#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1018 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1019 m_policy.space());
1020#endif
1021
1022#if KOKKOS_HPX_IMPLEMENTATION == 0
1023 using hpx::parallel::for_loop;
1024 using hpx::parallel::execution::par;
1025 using hpx::parallel::execution::static_chunk_size;
1026
1027 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1028 m_policy.begin(), m_policy.end(), [this](const Member i) {
1029 execute_functor<WorkTag>(m_functor, i);
1030 });
1031
1032#elif KOKKOS_HPX_IMPLEMENTATION == 1
1033 using hpx::apply;
1034 using hpx::lcos::local::latch;
1035
1036 const int num_tasks =
1037 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1038 m_policy.chunk_size();
1039 latch num_tasks_remaining(num_tasks);
1040 ChunkedRoundRobinExecutor exec(num_tasks);
1041
1042 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1043 i_begin += m_policy.chunk_size()) {
1044 apply(exec, [this, &num_tasks_remaining, i_begin]() {
1045 const Member i_end =
1046 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1047 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1048
1049 num_tasks_remaining.count_down(1);
1050 });
1051 }
1052
1053 num_tasks_remaining.wait();
1054
1055#elif KOKKOS_HPX_IMPLEMENTATION == 2
1056 using hpx::parallel::for_loop_strided;
1057 using hpx::parallel::execution::par;
1058 using hpx::parallel::execution::static_chunk_size;
1059
1060 const int num_tasks =
1061 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1062 m_policy.chunk_size();
1063 ChunkedRoundRobinExecutor exec(num_tasks);
1064
1065 for_loop_strided(
1066 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1067 m_policy.end(), m_policy.chunk_size(), [this](const Member i_begin) {
1068 const Member i_end =
1069 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1070 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1071 });
1072#endif
1073 }
1074
1075 inline ParallelFor(const FunctorType &arg_functor, Policy arg_policy)
1076 : m_functor(arg_functor), m_policy(arg_policy) {}
1077};
1078
1079template <class FunctorType, class... Traits>
1080class ParallelFor<FunctorType, Kokkos::MDRangePolicy<Traits...>,
1081 Kokkos::Experimental::HPX> {
1082 private:
1083 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1084 using Policy = typename MDRangePolicy::impl_range_policy;
1085 using WorkTag = typename MDRangePolicy::work_tag;
1086 using WorkRange = typename Policy::WorkRange;
1087 using Member = typename Policy::member_type;
1088 using iterate_type =
1089 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1090 WorkTag, void>;
1091
1092 const FunctorType m_functor;
1093 const MDRangePolicy m_mdr_policy;
1094 const Policy m_policy;
1095
1096 public:
1097 void execute() const { dispatch_execute_task(this, m_mdr_policy.space()); }
1098
1099 inline void execute_task() const {
1100 // See [note 1] for an explanation.
1101#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1102 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1103 m_mdr_policy.space());
1104#endif
1105
1106#if KOKKOS_HPX_IMPLEMENTATION == 0
1107 using hpx::parallel::for_loop;
1108 using hpx::parallel::execution::par;
1109 using hpx::parallel::execution::static_chunk_size;
1110
1111 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1112 m_policy.begin(), m_policy.end(), [this](const Member i) {
1113 iterate_type(m_mdr_policy, m_functor)(i);
1114 });
1115
1116#elif KOKKOS_HPX_IMPLEMENTATION == 1
1117 using hpx::apply;
1118 using hpx::lcos::local::latch;
1119
1120 const int num_tasks =
1121 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1122 m_policy.chunk_size();
1123 latch num_tasks_remaining(num_tasks);
1124 ChunkedRoundRobinExecutor exec(num_tasks);
1125
1126 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1127 i_begin += m_policy.chunk_size()) {
1128 apply(exec, [this, &num_tasks_remaining, i_begin]() {
1129 const Member i_end =
1130 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1131 for (Member i = i_begin; i < i_end; ++i) {
1132 iterate_type(m_mdr_policy, m_functor)(i);
1133 }
1134
1135 num_tasks_remaining.count_down(1);
1136 });
1137 }
1138
1139 num_tasks_remaining.wait();
1140
1141#elif KOKKOS_HPX_IMPLEMENTATION == 2
1142 using hpx::parallel::for_loop_strided;
1143 using hpx::parallel::execution::par;
1144 using hpx::parallel::execution::static_chunk_size;
1145
1146 const int num_tasks =
1147 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1148 m_policy.chunk_size();
1149 ChunkedRoundRobinExecutor exec(num_tasks);
1150
1151 for_loop_strided(
1152 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1153 m_policy.end(), m_policy.chunk_size(), [this](const Member i_begin) {
1154 const Member i_end =
1155 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1156 for (Member i = i_begin; i < i_end; ++i) {
1157 iterate_type(m_mdr_policy, m_functor)(i);
1158 }
1159 });
1160#endif
1161 }
1162
1163 inline ParallelFor(const FunctorType &arg_functor, MDRangePolicy arg_policy)
1164 : m_functor(arg_functor),
1165 m_mdr_policy(arg_policy),
1166 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)) {}
1167 template <typename Policy, typename Functor>
1168 static int max_tile_size_product(const Policy &, const Functor &) {
1174 return 1024;
1175 }
1176};
1177} // namespace Impl
1178} // namespace Kokkos
1179
1180namespace Kokkos {
1181namespace Impl {
1182template <class FunctorType, class ReducerType, class... Traits>
1183class ParallelReduce<FunctorType, Kokkos::RangePolicy<Traits...>, ReducerType,
1184 Kokkos::Experimental::HPX> {
1185 private:
1186 using Policy = Kokkos::RangePolicy<Traits...>;
1187 using WorkTag = typename Policy::work_tag;
1188 using WorkRange = typename Policy::WorkRange;
1189 using Member = typename Policy::member_type;
1190 using Analysis =
1191 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
1192 using ReducerConditional =
1193 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1194 FunctorType, ReducerType>;
1195 using ReducerTypeFwd = typename ReducerConditional::type;
1196 using WorkTagFwd =
1197 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1198 WorkTag, void>::type;
1199 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1200 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
1201 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1202 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1203 using value_type = typename Analysis::value_type;
1204 using pointer_type = typename Analysis::pointer_type;
1205 using reference_type = typename Analysis::reference_type;
1206
1207 const FunctorType m_functor;
1208 const Policy m_policy;
1209 const ReducerType m_reducer;
1210 const pointer_type m_result_ptr;
1211
1212 bool m_force_synchronous;
1213
1214 template <class TagType>
1215 inline static
1216 typename std::enable_if<std::is_same<TagType, void>::value>::type
1217 execute_functor(const FunctorType &functor, const Member i,
1218 reference_type update) {
1219 functor(i, update);
1220 }
1221
1222 template <class TagType>
1223 inline static
1224 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1225 execute_functor(const FunctorType &functor, const Member i,
1226 reference_type update) {
1227 const TagType t{};
1228 functor(t, i, update);
1229 }
1230
1231 template <class TagType>
1232 inline typename std::enable_if<std::is_same<TagType, void>::value>::type
1233 execute_functor_range(reference_type update, const Member i_begin,
1234 const Member i_end) const {
1235 for (Member i = i_begin; i < i_end; ++i) {
1236 m_functor(i, update);
1237 }
1238 }
1239
1240 template <class TagType>
1241 inline typename std::enable_if<!std::is_same<TagType, void>::value>::type
1242 execute_functor_range(reference_type update, const Member i_begin,
1243 const Member i_end) const {
1244 const TagType t{};
1245
1246 for (Member i = i_begin; i < i_end; ++i) {
1247 m_functor(t, i, update);
1248 }
1249 }
1250
1251 class value_type_wrapper {
1252 private:
1253 std::size_t m_value_size;
1254 char *m_value_buffer;
1255
1256 public:
1257 value_type_wrapper() : m_value_size(0), m_value_buffer(nullptr) {}
1258
1259 value_type_wrapper(const std::size_t value_size)
1260 : m_value_size(value_size), m_value_buffer(new char[m_value_size]) {}
1261
1262 value_type_wrapper(const value_type_wrapper &other)
1263 : m_value_size(0), m_value_buffer(nullptr) {
1264 if (this != &other) {
1265 m_value_buffer = new char[other.m_value_size];
1266 m_value_size = other.m_value_size;
1267
1268 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1269 m_value_buffer);
1270 }
1271 }
1272
1273 ~value_type_wrapper() { delete[] m_value_buffer; }
1274
1275 value_type_wrapper(value_type_wrapper &&other)
1276 : m_value_size(0), m_value_buffer(nullptr) {
1277 if (this != &other) {
1278 m_value_buffer = other.m_value_buffer;
1279 m_value_size = other.m_value_size;
1280
1281 other.m_value_buffer = nullptr;
1282 other.m_value_size = 0;
1283 }
1284 }
1285
1286 value_type_wrapper &operator=(const value_type_wrapper &other) {
1287 if (this != &other) {
1288 delete[] m_value_buffer;
1289 m_value_buffer = new char[other.m_value_size];
1290 m_value_size = other.m_value_size;
1291
1292 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1293 m_value_buffer);
1294 }
1295
1296 return *this;
1297 }
1298
1299 value_type_wrapper &operator=(value_type_wrapper &&other) {
1300 if (this != &other) {
1301 delete[] m_value_buffer;
1302 m_value_buffer = other.m_value_buffer;
1303 m_value_size = other.m_value_size;
1304
1305 other.m_value_buffer = nullptr;
1306 other.m_value_size = 0;
1307 }
1308
1309 return *this;
1310 }
1311
1312 pointer_type pointer() const {
1313 return reinterpret_cast<pointer_type>(m_value_buffer);
1314 }
1315
1316 reference_type reference() const {
1317 return ValueOps::reference(
1318 reinterpret_cast<pointer_type>(m_value_buffer));
1319 }
1320 };
1321
1322 public:
1323 void execute() const {
1324 if (m_policy.end() <= m_policy.begin()) {
1325 if (m_result_ptr) {
1326 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1327 m_result_ptr);
1328 ValueFinal::final(ReducerConditional::select(m_functor, m_reducer),
1329 m_result_ptr);
1330 }
1331 return;
1332 }
1333 dispatch_execute_task(this, m_policy.space(), m_force_synchronous);
1334 }
1335
1336 inline void execute_task() const {
1337 // See [note 1] for an explanation.
1338#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1339 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1340 m_policy.space());
1341#endif
1342
1343 const std::size_t value_size =
1344 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1345
1346#if KOKKOS_HPX_IMPLEMENTATION == 0
1347 // NOTE: This version makes the most use of HPX functionality, but
1348 // requires the struct value_type_wrapper to handle different
1349 // reference_types. It is also significantly slower than the version
1350 // below due to not reusing the buffer used by other functions.
1351 using hpx::parallel::for_loop;
1352 using hpx::parallel::reduction;
1353 using hpx::parallel::execution::par;
1354 using hpx::parallel::execution::static_chunk_size;
1355
1356 value_type_wrapper final_value(value_size);
1357 value_type_wrapper identity(value_size);
1358
1359 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1360 final_value.pointer());
1361 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1362 identity.pointer());
1363
1364 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1365 m_policy.begin(), m_policy.end(),
1366 reduction(final_value, identity,
1367 [this](value_type_wrapper &a,
1368 value_type_wrapper &b) -> value_type_wrapper & {
1369 ValueJoin::join(
1370 ReducerConditional::select(m_functor, m_reducer),
1371 a.pointer(), b.pointer());
1372 return a;
1373 }),
1374 [this](Member i, value_type_wrapper &update) {
1375 execute_functor<WorkTag>(m_functor, i, update.reference());
1376 });
1377
1378 pointer_type final_value_ptr = final_value.pointer();
1379
1380#elif KOKKOS_HPX_IMPLEMENTATION == 1
1381 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1382
1383 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1384 buffer.resize(num_worker_threads, value_size);
1385
1386 using hpx::apply;
1387 using hpx::lcos::local::latch;
1388
1389 {
1390 latch num_tasks_remaining(num_worker_threads);
1391 ChunkedRoundRobinExecutor exec(num_worker_threads);
1392
1393 for (int t = 0; t < num_worker_threads; ++t) {
1394 apply(exec, [this, &num_tasks_remaining, &buffer, t]() {
1395 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1396 reinterpret_cast<pointer_type>(buffer.get(t)));
1397
1398 num_tasks_remaining.count_down(1);
1399 });
1400 }
1401
1402 num_tasks_remaining.wait();
1403 }
1404
1405 const int num_tasks =
1406 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1407 m_policy.chunk_size();
1408 latch num_tasks_remaining(num_tasks);
1409 ChunkedRoundRobinExecutor exec(num_tasks);
1410
1411 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1412 i_begin += m_policy.chunk_size()) {
1413 apply(exec, [this, &num_tasks_remaining, &buffer, i_begin]() {
1414 reference_type update =
1415 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1416 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1417 const Member i_end =
1418 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1419 execute_functor_range<WorkTag>(update, i_begin, i_end);
1420
1421 num_tasks_remaining.count_down(1);
1422 });
1423 }
1424
1425 num_tasks_remaining.wait();
1426
1427 for (int i = 1; i < num_worker_threads; ++i) {
1428 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1429 reinterpret_cast<pointer_type>(buffer.get(0)),
1430 reinterpret_cast<pointer_type>(buffer.get(i)));
1431 }
1432
1433 pointer_type final_value_ptr =
1434 reinterpret_cast<pointer_type>(buffer.get(0));
1435
1436#elif KOKKOS_HPX_IMPLEMENTATION == 2
1437 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1438
1439 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1440 buffer.resize(num_worker_threads, value_size);
1441
1442 using hpx::parallel::for_loop;
1443 using hpx::parallel::for_loop_strided;
1444 using hpx::parallel::execution::par;
1445 using hpx::parallel::execution::static_chunk_size;
1446
1447 {
1448 ChunkedRoundRobinExecutor exec(num_worker_threads);
1449
1450 for_loop(par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1451 num_worker_threads, [this, &buffer](const std::size_t t) {
1452 ValueInit::init(
1453 ReducerConditional::select(m_functor, m_reducer),
1454 reinterpret_cast<pointer_type>(buffer.get(t)));
1455 });
1456 }
1457
1458 const int num_tasks =
1459 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1460 m_policy.chunk_size();
1461 ChunkedRoundRobinExecutor exec(num_tasks);
1462
1463 for_loop_strided(
1464 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1465 m_policy.end(), m_policy.chunk_size(),
1466 [this, &buffer](const Member i_begin) {
1467 reference_type update =
1468 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1469 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1470 const Member i_end =
1471 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1472 execute_functor_range<WorkTag>(update, i_begin, i_end);
1473 });
1474
1475 for (int i = 1; i < num_worker_threads; ++i) {
1476 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1477 reinterpret_cast<pointer_type>(buffer.get(0)),
1478 reinterpret_cast<pointer_type>(buffer.get(i)));
1479 }
1480
1481 pointer_type final_value_ptr =
1482 reinterpret_cast<pointer_type>(buffer.get(0));
1483#endif
1484
1485 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1486 ReducerConditional::select(m_functor, m_reducer), final_value_ptr);
1487
1488 if (m_result_ptr != nullptr) {
1489 const int n = Analysis::value_count(
1490 ReducerConditional::select(m_functor, m_reducer));
1491
1492 for (int j = 0; j < n; ++j) {
1493 m_result_ptr[j] = final_value_ptr[j];
1494 }
1495 }
1496 }
1497
1498 template <class ViewType>
1499 inline ParallelReduce(
1500 const FunctorType &arg_functor, Policy arg_policy,
1501 const ViewType &arg_view,
1502 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1503 !Kokkos::is_reducer_type<ReducerType>::value,
1504 void *>::type = nullptr)
1505 : m_functor(arg_functor),
1506 m_policy(arg_policy),
1507 m_reducer(InvalidType()),
1508 m_result_ptr(arg_view.data()),
1509 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1510
1511 inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
1512 const ReducerType &reducer)
1513 : m_functor(arg_functor),
1514 m_policy(arg_policy),
1515 m_reducer(reducer),
1516 m_result_ptr(reducer.view().data()),
1517 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1518};
1519
1520template <class FunctorType, class ReducerType, class... Traits>
1521class ParallelReduce<FunctorType, Kokkos::MDRangePolicy<Traits...>, ReducerType,
1522 Kokkos::Experimental::HPX> {
1523 private:
1524 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1525 using Policy = typename MDRangePolicy::impl_range_policy;
1526 using WorkTag = typename MDRangePolicy::work_tag;
1527 using WorkRange = typename Policy::WorkRange;
1528 using Member = typename Policy::member_type;
1529 using Analysis = FunctorAnalysis<FunctorPatternInterface::REDUCE,
1530 MDRangePolicy, FunctorType>;
1531 using ReducerConditional =
1532 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1533 FunctorType, ReducerType>;
1534 using ReducerTypeFwd = typename ReducerConditional::type;
1535 using WorkTagFwd =
1536 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1537 WorkTag, void>::type;
1538 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1539 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
1540 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1541 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1542 using pointer_type = typename Analysis::pointer_type;
1543 using value_type = typename Analysis::value_type;
1544 using reference_type = typename Analysis::reference_type;
1545 using iterate_type =
1546 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1547 WorkTag, reference_type>;
1548
1549 const FunctorType m_functor;
1550 const MDRangePolicy m_mdr_policy;
1551 const Policy m_policy;
1552 const ReducerType m_reducer;
1553 const pointer_type m_result_ptr;
1554
1555 bool m_force_synchronous;
1556
1557 public:
1558 void execute() const {
1559 dispatch_execute_task(this, m_mdr_policy.space(), m_force_synchronous);
1560 }
1561
1562 inline void execute_task() const {
1563 // See [note 1] for an explanation.
1564#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1565 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1566 m_mdr_policy.space());
1567#endif
1568
1569 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1570 const std::size_t value_size =
1571 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1572
1573 thread_buffer &buffer = m_mdr_policy.space().impl_get_buffer();
1574 buffer.resize(num_worker_threads, value_size);
1575
1576#if KOKKOS_HPX_IMPLEMENTATION == 0
1577 using hpx::parallel::for_loop;
1578 using hpx::parallel::execution::par;
1579 using hpx::parallel::execution::static_chunk_size;
1580
1581 for_loop(par, 0, num_worker_threads, [this, &buffer](std::size_t t) {
1582 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1583 reinterpret_cast<pointer_type>(buffer.get(t)));
1584 });
1585
1586 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1587 m_policy.begin(), m_policy.end(), [this, &buffer](const Member i) {
1588 reference_type update = ValueOps::reference(
1589 reinterpret_cast<pointer_type>(buffer.get(
1590 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1591 iterate_type(m_mdr_policy, m_functor, update)(i);
1592 });
1593
1594#elif KOKKOS_HPX_IMPLEMENTATION == 1
1595 using hpx::apply;
1596 using hpx::lcos::local::latch;
1597
1598 {
1599 latch num_tasks_remaining(num_worker_threads);
1600 ChunkedRoundRobinExecutor exec(num_worker_threads);
1601
1602 for (int t = 0; t < num_worker_threads; ++t) {
1603 apply(exec, [this, &buffer, &num_tasks_remaining, t]() {
1604 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1605 reinterpret_cast<pointer_type>(buffer.get(t)));
1606
1607 num_tasks_remaining.count_down(1);
1608 });
1609 }
1610
1611 num_tasks_remaining.wait();
1612 }
1613
1614 const int num_tasks =
1615 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1616 m_policy.chunk_size();
1617 latch num_tasks_remaining(num_tasks);
1618 ChunkedRoundRobinExecutor exec(num_tasks);
1619
1620 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1621 i_begin += m_policy.chunk_size()) {
1622 apply(exec, [this, &num_tasks_remaining, &buffer, i_begin]() {
1623 reference_type update =
1624 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1625 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1626 const Member i_end =
1627 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1628
1629 for (Member i = i_begin; i < i_end; ++i) {
1630 iterate_type(m_mdr_policy, m_functor, update)(i);
1631 }
1632
1633 num_tasks_remaining.count_down(1);
1634 });
1635 }
1636
1637 num_tasks_remaining.wait();
1638
1639#elif KOKKOS_HPX_IMPLEMENTATION == 2
1640 using hpx::parallel::for_loop;
1641 using hpx::parallel::for_loop_strided;
1642 using hpx::parallel::execution::par;
1643 using hpx::parallel::execution::static_chunk_size;
1644
1645 {
1646 ChunkedRoundRobinExecutor exec(num_worker_threads);
1647
1648 for_loop(par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1649 num_worker_threads, [this, &buffer](const std::size_t t) {
1650 ValueInit::init(
1651 ReducerConditional::select(m_functor, m_reducer),
1652 reinterpret_cast<pointer_type>(buffer.get(t)));
1653 });
1654 }
1655
1656 const int num_tasks =
1657 (m_policy.end() - m_policy.begin() + m_policy.chunk_size() - 1) /
1658 m_policy.chunk_size();
1659 ChunkedRoundRobinExecutor exec(num_tasks);
1660
1661 for_loop_strided(
1662 par.on(exec).with(static_chunk_size(1)), m_policy.begin(),
1663 m_policy.end(), m_policy.chunk_size(),
1664 [this, &buffer](const Member i_begin) {
1665 reference_type update =
1666 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(
1667 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1668 const Member i_end =
1669 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1670
1671 for (Member i = i_begin; i < i_end; ++i) {
1672 iterate_type(m_mdr_policy, m_functor, update)(i);
1673 }
1674 });
1675#endif
1676
1677 for (int i = 1; i < num_worker_threads; ++i) {
1678 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1679 reinterpret_cast<pointer_type>(buffer.get(0)),
1680 reinterpret_cast<pointer_type>(buffer.get(i)));
1681 }
1682
1683 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1684 ReducerConditional::select(m_functor, m_reducer),
1685 reinterpret_cast<pointer_type>(buffer.get(0)));
1686
1687 if (m_result_ptr != nullptr) {
1688 const int n = Analysis::value_count(
1689 ReducerConditional::select(m_functor, m_reducer));
1690
1691 for (int j = 0; j < n; ++j) {
1692 m_result_ptr[j] = reinterpret_cast<pointer_type>(buffer.get(0))[j];
1693 }
1694 }
1695 }
1696
1697 template <class ViewType>
1698 inline ParallelReduce(
1699 const FunctorType &arg_functor, MDRangePolicy arg_policy,
1700 const ViewType &arg_view,
1701 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1702 !Kokkos::is_reducer_type<ReducerType>::value,
1703 void *>::type = nullptr)
1704 : m_functor(arg_functor),
1705 m_mdr_policy(arg_policy),
1706 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1707 m_reducer(InvalidType()),
1708 m_result_ptr(arg_view.data()),
1709 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1710
1711 inline ParallelReduce(const FunctorType &arg_functor,
1712 MDRangePolicy arg_policy, const ReducerType &reducer)
1713 : m_functor(arg_functor),
1714 m_mdr_policy(arg_policy),
1715 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1716 m_reducer(reducer),
1717 m_result_ptr(reducer.view().data()),
1718 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1719 template <typename Policy, typename Functor>
1720 static int max_tile_size_product(const Policy &, const Functor &) {
1726 return 1024;
1727 }
1728};
1729} // namespace Impl
1730} // namespace Kokkos
1731
1732namespace Kokkos {
1733namespace Impl {
1734
1735template <class FunctorType, class... Traits>
1736class ParallelScan<FunctorType, Kokkos::RangePolicy<Traits...>,
1737 Kokkos::Experimental::HPX> {
1738 private:
1739 using Policy = Kokkos::RangePolicy<Traits...>;
1740 using WorkTag = typename Policy::work_tag;
1741 using WorkRange = typename Policy::WorkRange;
1742 using Member = typename Policy::member_type;
1743 using Analysis =
1744 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1745 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1746 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1747 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1748 using pointer_type = typename Analysis::pointer_type;
1749 using reference_type = typename Analysis::reference_type;
1750 using value_type = typename Analysis::value_type;
1751
1752 const FunctorType m_functor;
1753 const Policy m_policy;
1754
1755 template <class TagType>
1756 inline static
1757 typename std::enable_if<std::is_same<TagType, void>::value>::type
1758 execute_functor_range(const FunctorType &functor, const Member i_begin,
1759 const Member i_end, reference_type update,
1760 const bool final) {
1761 for (Member i = i_begin; i < i_end; ++i) {
1762 functor(i, update, final);
1763 }
1764 }
1765
1766 template <class TagType>
1767 inline static
1768 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1769 execute_functor_range(const FunctorType &functor, const Member i_begin,
1770 const Member i_end, reference_type update,
1771 const bool final) {
1772 const TagType t{};
1773 for (Member i = i_begin; i < i_end; ++i) {
1774 functor(t, i, update, final);
1775 }
1776 }
1777
1778 public:
1779 void execute() const { dispatch_execute_task(this, m_policy.space()); }
1780
1781 inline void execute_task() const {
1782 // See [note 1] for an explanation.
1783#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1784 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1785 m_policy.space());
1786#endif
1787
1788 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1789 const int value_count = Analysis::value_count(m_functor);
1790 const std::size_t value_size = Analysis::value_size(m_functor);
1791
1792 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1793 buffer.resize(num_worker_threads, 2 * value_size);
1794
1795 using hpx::apply;
1796 using hpx::lcos::local::barrier;
1797 using hpx::lcos::local::latch;
1798
1799 barrier bar(num_worker_threads);
1800 latch num_tasks_remaining(num_worker_threads);
1801 ChunkedRoundRobinExecutor exec(num_worker_threads);
1802
1803 for (int t = 0; t < num_worker_threads; ++t) {
1804 apply(exec, [this, &bar, &buffer, &num_tasks_remaining,
1805 num_worker_threads, value_count, value_size, t]() {
1806 reference_type update_sum = ValueInit::init(
1807 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1808
1809 const WorkRange range(m_policy, t, num_worker_threads);
1810 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1811 update_sum, false);
1812
1813 bar.wait();
1814
1815 if (t == 0) {
1816 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1817 buffer.get(0) + value_size));
1818
1819 for (int i = 1; i < num_worker_threads; ++i) {
1820 pointer_type ptr_1_prev =
1821 reinterpret_cast<pointer_type>(buffer.get(i - 1));
1822 pointer_type ptr_2_prev =
1823 reinterpret_cast<pointer_type>(buffer.get(i - 1) + value_size);
1824 pointer_type ptr_2 =
1825 reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1826
1827 for (int j = 0; j < value_count; ++j) {
1828 ptr_2[j] = ptr_2_prev[j];
1829 }
1830
1831 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1832 }
1833 }
1834
1835 bar.wait();
1836
1837 reference_type update_base = ValueOps::reference(
1838 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1839
1840 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1841 update_base, true);
1842
1843 num_tasks_remaining.count_down(1);
1844 });
1845 }
1846
1847 num_tasks_remaining.wait();
1848 }
1849
1850 inline ParallelScan(const FunctorType &arg_functor, const Policy &arg_policy)
1851 : m_functor(arg_functor), m_policy(arg_policy) {}
1852};
1853
1854template <class FunctorType, class ReturnType, class... Traits>
1855class ParallelScanWithTotal<FunctorType, Kokkos::RangePolicy<Traits...>,
1856 ReturnType, Kokkos::Experimental::HPX> {
1857 private:
1858 using Policy = Kokkos::RangePolicy<Traits...>;
1859 using WorkTag = typename Policy::work_tag;
1860 using WorkRange = typename Policy::WorkRange;
1861 using Member = typename Policy::member_type;
1862 using Analysis =
1863 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1864 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1865 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1866 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1867 using pointer_type = typename Analysis::pointer_type;
1868 using reference_type = typename Analysis::reference_type;
1869 using value_type = typename Analysis::value_type;
1870
1871 const FunctorType m_functor;
1872 const Policy m_policy;
1873 ReturnType &m_returnvalue;
1874
1875 template <class TagType>
1876 inline static
1877 typename std::enable_if<std::is_same<TagType, void>::value>::type
1878 execute_functor_range(const FunctorType &functor, const Member i_begin,
1879 const Member i_end, reference_type update,
1880 const bool final) {
1881 for (Member i = i_begin; i < i_end; ++i) {
1882 functor(i, update, final);
1883 }
1884 }
1885
1886 template <class TagType>
1887 inline static
1888 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1889 execute_functor_range(const FunctorType &functor, const Member i_begin,
1890 const Member i_end, reference_type update,
1891 const bool final) {
1892 const TagType t{};
1893 for (Member i = i_begin; i < i_end; ++i) {
1894 functor(t, i, update, final);
1895 }
1896 }
1897
1898 public:
1899 void execute() const { dispatch_execute_task(this, m_policy.space()); }
1900
1901 inline void execute_task() const {
1902 // See [note 1] for an explanation.
1903#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
1904 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1905 m_policy.space());
1906#endif
1907
1908 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1909 const int value_count = Analysis::value_count(m_functor);
1910 const std::size_t value_size = Analysis::value_size(m_functor);
1911
1912 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1913 buffer.resize(num_worker_threads, 2 * value_size);
1914
1915 using hpx::apply;
1916 using hpx::lcos::local::barrier;
1917 using hpx::lcos::local::latch;
1918
1919 barrier bar(num_worker_threads);
1920 latch num_tasks_remaining(num_worker_threads);
1921 ChunkedRoundRobinExecutor exec(num_worker_threads);
1922
1923 for (int t = 0; t < num_worker_threads; ++t) {
1924 apply(exec, [this, &bar, &buffer, &num_tasks_remaining,
1925 num_worker_threads, value_count, value_size, t]() {
1926 reference_type update_sum = ValueInit::init(
1927 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1928
1929 const WorkRange range(m_policy, t, num_worker_threads);
1930 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1931 update_sum, false);
1932
1933 bar.wait();
1934
1935 if (t == 0) {
1936 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1937 buffer.get(0) + value_size));
1938
1939 for (int i = 1; i < num_worker_threads; ++i) {
1940 pointer_type ptr_1_prev =
1941 reinterpret_cast<pointer_type>(buffer.get(i - 1));
1942 pointer_type ptr_2_prev =
1943 reinterpret_cast<pointer_type>(buffer.get(i - 1) + value_size);
1944 pointer_type ptr_2 =
1945 reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1946
1947 for (int j = 0; j < value_count; ++j) {
1948 ptr_2[j] = ptr_2_prev[j];
1949 }
1950
1951 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1952 }
1953 }
1954
1955 bar.wait();
1956
1957 reference_type update_base = ValueOps::reference(
1958 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1959
1960 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1961 update_base, true);
1962
1963 if (t == num_worker_threads - 1) {
1964 m_returnvalue = update_base;
1965 }
1966
1967 num_tasks_remaining.count_down(1);
1968 });
1969 }
1970
1971 num_tasks_remaining.wait();
1972 }
1973
1974 inline ParallelScanWithTotal(const FunctorType &arg_functor,
1975 const Policy &arg_policy,
1976 ReturnType &arg_returnvalue)
1977 : m_functor(arg_functor),
1978 m_policy(arg_policy),
1979 m_returnvalue(arg_returnvalue) {}
1980};
1981} // namespace Impl
1982} // namespace Kokkos
1983
1984namespace Kokkos {
1985namespace Impl {
1986template <class FunctorType, class... Properties>
1987class ParallelFor<FunctorType, Kokkos::TeamPolicy<Properties...>,
1988 Kokkos::Experimental::HPX> {
1989 private:
1990 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1991 using WorkTag = typename Policy::work_tag;
1992 using Member = typename Policy::member_type;
1993 using memory_space = Kokkos::HostSpace;
1994
1995 const FunctorType m_functor;
1996 const Policy m_policy;
1997 const int m_league;
1998 const std::size_t m_shared;
1999
2000 template <class TagType>
2001 inline static
2002 typename std::enable_if<std::is_same<TagType, void>::value>::type
2003 execute_functor(const FunctorType &functor, const Policy &policy,
2004 const int league_rank, char *local_buffer,
2005 const std::size_t local_buffer_size) {
2006 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2007 }
2008
2009 template <class TagType>
2010 inline static
2011 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2012 execute_functor(const FunctorType &functor, const Policy &policy,
2013 const int league_rank, char *local_buffer,
2014 const std::size_t local_buffer_size) {
2015 const TagType t{};
2016 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2017 }
2018
2019 template <class TagType>
2020 inline static
2021 typename std::enable_if<std::is_same<TagType, void>::value>::type
2022 execute_functor_range(const FunctorType &functor, const Policy &policy,
2023 const int league_rank_begin,
2024 const int league_rank_end, char *local_buffer,
2025 const std::size_t local_buffer_size) {
2026 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
2027 ++league_rank) {
2028 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2029 }
2030 }
2031
2032 template <class TagType>
2033 inline static
2034 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2035 execute_functor_range(const FunctorType &functor, const Policy &policy,
2036 const int league_rank_begin,
2037 const int league_rank_end, char *local_buffer,
2038 const std::size_t local_buffer_size) {
2039 const TagType t{};
2040 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
2041 ++league_rank) {
2042 functor(t,
2043 Member(policy, 0, league_rank, local_buffer, local_buffer_size));
2044 }
2045 }
2046
2047 public:
2048 void execute() const { dispatch_execute_task(this, m_policy.space()); }
2049
2050 inline void execute_task() const {
2051 // See [note 1] for an explanation.
2052#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
2053 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
2054 m_policy.space());
2055#endif
2056
2057 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
2058
2059 thread_buffer &buffer = m_policy.space().impl_get_buffer();
2060 buffer.resize(num_worker_threads, m_shared);
2061
2062#if KOKKOS_HPX_IMPLEMENTATION == 0
2063 using hpx::parallel::for_loop;
2064 using hpx::parallel::execution::par;
2065 using hpx::parallel::execution::static_chunk_size;
2066
2067 for_loop(
2068 par.with(static_chunk_size(m_policy.chunk_size())), 0,
2069 m_policy.league_size(), [this, &buffer](const int league_rank) {
2070 execute_functor<WorkTag>(
2071 m_functor, m_policy, league_rank,
2072 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2073 m_shared);
2074 });
2075
2076#elif KOKKOS_HPX_IMPLEMENTATION == 1
2077 using hpx::apply;
2078 using hpx::lcos::local::latch;
2079
2080 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2081 m_policy.chunk_size();
2082 latch num_tasks_remaining(num_tasks);
2083 ChunkedRoundRobinExecutor exec(num_tasks);
2084
2085 for (int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
2086 league_rank_begin += m_policy.chunk_size()) {
2087 apply(exec, [this, &buffer, &num_tasks_remaining, league_rank_begin]() {
2088 const int league_rank_end = (std::min)(
2089 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
2090 execute_functor_range<WorkTag>(
2091 m_functor, m_policy, league_rank_begin, league_rank_end,
2092 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2093 m_shared);
2094
2095 num_tasks_remaining.count_down(1);
2096 });
2097 }
2098
2099 num_tasks_remaining.wait();
2100
2101#elif KOKKOS_HPX_IMPLEMENTATION == 2
2102 using hpx::parallel::for_loop_strided;
2103 using hpx::parallel::execution::par;
2104 using hpx::parallel::execution::static_chunk_size;
2105
2106 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2107 m_policy.chunk_size();
2108 ChunkedRoundRobinExecutor exec(num_tasks);
2109
2110 for_loop_strided(
2111 par.on(exec).with(static_chunk_size(1)), 0, m_policy.league_size(),
2112 m_policy.chunk_size(), [this, &buffer](const int league_rank_begin) {
2113 const int league_rank_end =
2114 (std::min)(league_rank_begin + m_policy.chunk_size(),
2115 m_policy.league_size());
2116 execute_functor_range<WorkTag>(
2117 m_functor, m_policy, league_rank_begin, league_rank_end,
2118 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
2119 m_shared);
2120 });
2121#endif
2122 }
2123
2124 ParallelFor(const FunctorType &arg_functor, const Policy &arg_policy)
2125 : m_functor(arg_functor),
2126 m_policy(arg_policy),
2127 m_league(arg_policy.league_size()),
2128 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2129 FunctorTeamShmemSize<FunctorType>::value(
2130 arg_functor, arg_policy.team_size())) {}
2131};
2132
2133template <class FunctorType, class ReducerType, class... Properties>
2134class ParallelReduce<FunctorType, Kokkos::TeamPolicy<Properties...>,
2135 ReducerType, Kokkos::Experimental::HPX> {
2136 private:
2137 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
2138 using Analysis =
2139 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
2140 using Member = typename Policy::member_type;
2141 using WorkTag = typename Policy::work_tag;
2142 using ReducerConditional =
2143 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
2144 FunctorType, ReducerType>;
2145 using ReducerTypeFwd = typename ReducerConditional::type;
2146 using WorkTagFwd =
2147 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
2148 WorkTag, void>::type;
2149 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
2150 using ValueFinal = Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>;
2151 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
2152 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
2153 using pointer_type = typename Analysis::pointer_type;
2154 using reference_type = typename Analysis::reference_type;
2155 using value_type = typename Analysis::value_type;
2156
2157 const FunctorType m_functor;
2158 const int m_league;
2159 const Policy m_policy;
2160 const ReducerType m_reducer;
2161 pointer_type m_result_ptr;
2162 const std::size_t m_shared;
2163
2164 bool m_force_synchronous;
2165
2166 template <class TagType>
2167 inline static
2168 typename std::enable_if<std::is_same<TagType, void>::value>::type
2169 execute_functor(const FunctorType &functor, const Policy &policy,
2170 const int league_rank, char *local_buffer,
2171 const std::size_t local_buffer_size,
2172 reference_type update) {
2173 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2174 update);
2175 }
2176
2177 template <class TagType>
2178 inline static
2179 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2180 execute_functor(const FunctorType &functor, const Policy &policy,
2181 const int league_rank, char *local_buffer,
2182 const std::size_t local_buffer_size,
2183 reference_type update) {
2184 const TagType t{};
2185 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2186 update);
2187 }
2188
2189 template <class TagType>
2190 inline static
2191 typename std::enable_if<std::is_same<TagType, void>::value>::type
2192 execute_functor_range(const FunctorType &functor, const Policy &policy,
2193 const int league_rank_begin,
2194 const int league_rank_end, char *local_buffer,
2195 const std::size_t local_buffer_size,
2196 reference_type update) {
2197 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
2198 ++league_rank) {
2199 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2200 update);
2201 }
2202 }
2203
2204 template <class TagType>
2205 inline static
2206 typename std::enable_if<!std::is_same<TagType, void>::value>::type
2207 execute_functor_range(const FunctorType &functor, const Policy &policy,
2208 const int league_rank_begin,
2209 const int league_rank_end, char *local_buffer,
2210 const std::size_t local_buffer_size,
2211 reference_type update) {
2212 const TagType t{};
2213 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
2214 ++league_rank) {
2215 functor(t,
2216 Member(policy, 0, league_rank, local_buffer, local_buffer_size),
2217 update);
2218 }
2219 }
2220
2221 public:
2222 void execute() const {
2223 if (m_policy.league_size() * m_policy.team_size() == 0) {
2224 if (m_result_ptr) {
2225 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2226 m_result_ptr);
2227 ValueFinal::final(ReducerConditional::select(m_functor, m_reducer),
2228 m_result_ptr);
2229 }
2230 return;
2231 }
2232 dispatch_execute_task(this, m_policy.space());
2233 }
2234
2235 inline void execute_task() const {
2236 // See [note 1] for an explanation.
2237#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
2238 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
2239 m_policy.space());
2240#endif
2241
2242 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
2243 const std::size_t value_size =
2244 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
2245
2246 thread_buffer &buffer = m_policy.space().impl_get_buffer();
2247 buffer.resize(num_worker_threads, value_size + m_shared);
2248
2249#if KOKKOS_HPX_IMPLEMENTATION == 0
2250 using hpx::parallel::for_loop;
2251 using hpx::parallel::execution::par;
2252
2253 for_loop(par, 0, num_worker_threads, [this, &buffer](const std::size_t t) {
2254 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2255 reinterpret_cast<pointer_type>(buffer.get(t)));
2256 });
2257
2258 using hpx::parallel::execution::static_chunk_size;
2259
2260 hpx::parallel::for_loop(
2261 par.with(static_chunk_size(m_policy.chunk_size())), 0,
2262 m_policy.league_size(),
2263 [this, &buffer, value_size](const int league_rank) {
2264 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2265 reference_type update = ValueOps::reference(
2266 reinterpret_cast<pointer_type>(buffer.get(t)));
2267
2268 execute_functor<WorkTag>(m_functor, m_policy, league_rank,
2269 buffer.get(t) + value_size, m_shared,
2270 update);
2271 });
2272
2273#elif KOKKOS_HPX_IMPLEMENTATION == 1
2274 using hpx::apply;
2275 using hpx::lcos::local::latch;
2276
2277 {
2278 latch num_tasks_remaining(num_worker_threads);
2279 ChunkedRoundRobinExecutor exec(num_worker_threads);
2280
2281 for (int t = 0; t < num_worker_threads; ++t) {
2282 apply(exec, [this, &buffer, &num_tasks_remaining, t]() {
2283 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
2284 reinterpret_cast<pointer_type>(buffer.get(t)));
2285
2286 num_tasks_remaining.count_down(1);
2287 });
2288 }
2289
2290 num_tasks_remaining.wait();
2291 }
2292
2293 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2294 m_policy.chunk_size();
2295 latch num_tasks_remaining(num_tasks);
2296 ChunkedRoundRobinExecutor exec(num_tasks);
2297
2298 for (int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
2299 league_rank_begin += m_policy.chunk_size()) {
2300 apply(exec, [this, &buffer, &num_tasks_remaining, league_rank_begin,
2301 value_size]() {
2302 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2303 reference_type update =
2304 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(t)));
2305 const int league_rank_end = (std::min)(
2306 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
2307 execute_functor_range<WorkTag>(
2308 m_functor, m_policy, league_rank_begin, league_rank_end,
2309 buffer.get(t) + value_size, m_shared, update);
2310
2311 num_tasks_remaining.count_down(1);
2312 });
2313 }
2314
2315 num_tasks_remaining.wait();
2316
2317#elif KOKKOS_HPX_IMPLEMENTATION == 2
2318 using hpx::parallel::for_loop;
2319 using hpx::parallel::for_loop_strided;
2320 using hpx::parallel::execution::par;
2321 using hpx::parallel::execution::static_chunk_size;
2322
2323 {
2324 ChunkedRoundRobinExecutor exec(num_worker_threads);
2325
2326 for_loop(par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
2327 [this, &buffer](std::size_t const t) {
2328 ValueInit::init(
2329 ReducerConditional::select(m_functor, m_reducer),
2330 reinterpret_cast<pointer_type>(buffer.get(t)));
2331 });
2332 }
2333
2334 const int num_tasks = (m_policy.league_size() + m_policy.chunk_size() - 1) /
2335 m_policy.chunk_size();
2336 ChunkedRoundRobinExecutor exec(num_tasks);
2337
2338 for_loop_strided(
2339 par.on(exec).with(static_chunk_size(1)), 0, m_policy.league_size(),
2340 m_policy.chunk_size(),
2341 [this, &buffer, value_size](int const league_rank_begin) {
2342 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2343 reference_type update = ValueOps::reference(
2344 reinterpret_cast<pointer_type>(buffer.get(t)));
2345 const int league_rank_end =
2346 (std::min)(league_rank_begin + m_policy.chunk_size(),
2347 m_policy.league_size());
2348 execute_functor_range<WorkTag>(
2349 m_functor, m_policy, league_rank_begin, league_rank_end,
2350 buffer.get(t) + value_size, m_shared, update);
2351 });
2352#endif
2353
2354 const pointer_type ptr = reinterpret_cast<pointer_type>(buffer.get(0));
2355 for (int t = 1; t < num_worker_threads; ++t) {
2356 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer), ptr,
2357 reinterpret_cast<pointer_type>(buffer.get(t)));
2358 }
2359
2360 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
2361 ReducerConditional::select(m_functor, m_reducer), ptr);
2362
2363 if (m_result_ptr) {
2364 const int n = Analysis::value_count(
2365 ReducerConditional::select(m_functor, m_reducer));
2366
2367 for (int j = 0; j < n; ++j) {
2368 m_result_ptr[j] = ptr[j];
2369 }
2370 }
2371 }
2372
2373 template <class ViewType>
2374 ParallelReduce(
2375 const FunctorType &arg_functor, const Policy &arg_policy,
2376 const ViewType &arg_result,
2377 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
2378 !Kokkos::is_reducer_type<ReducerType>::value,
2379 void *>::type = nullptr)
2380 : m_functor(arg_functor),
2381 m_league(arg_policy.league_size()),
2382 m_policy(arg_policy),
2383 m_reducer(InvalidType()),
2384 m_result_ptr(arg_result.data()),
2385 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2386 FunctorTeamShmemSize<FunctorType>::value(
2387 m_functor, arg_policy.team_size())),
2388 m_force_synchronous(!arg_result.impl_track().has_record()) {}
2389
2390 inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
2391 const ReducerType &reducer)
2392 : m_functor(arg_functor),
2393 m_league(arg_policy.league_size()),
2394 m_policy(arg_policy),
2395 m_reducer(reducer),
2396 m_result_ptr(reducer.view().data()),
2397 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2398 FunctorTeamShmemSize<FunctorType>::value(
2399 arg_functor, arg_policy.team_size())),
2400 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
2401};
2402} // namespace Impl
2403} // namespace Kokkos
2404
2405namespace Kokkos {
2406
2407template <typename iType>
2408KOKKOS_INLINE_FUNCTION
2409 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2410 TeamThreadRange(const Impl::HPXTeamMember &thread, const iType &count) {
2411 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2412 thread, count);
2413}
2414
2415template <typename iType1, typename iType2>
2416KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2417 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2418TeamThreadRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2419 const iType2 &i_end) {
2420 using iType = typename std::common_type<iType1, iType2>::type;
2421 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2422 thread, iType(i_begin), iType(i_end));
2423}
2424
2425template <typename iType>
2426KOKKOS_INLINE_FUNCTION
2427 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2428 TeamVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2429 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2430 thread, count);
2431}
2432
2433template <typename iType1, typename iType2>
2434KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2435 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2436TeamVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2437 const iType2 &i_end) {
2438 using iType = typename std::common_type<iType1, iType2>::type;
2439 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2440 thread, iType(i_begin), iType(i_end));
2441}
2442
2443template <typename iType>
2444KOKKOS_INLINE_FUNCTION
2445 Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2446 ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2447 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2448 thread, count);
2449}
2450
2451template <typename iType1, typename iType2>
2452KOKKOS_INLINE_FUNCTION Impl::ThreadVectorRangeBoundariesStruct<
2453 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
2454ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2455 const iType2 &i_end) {
2456 using iType = typename std::common_type<iType1, iType2>::type;
2457 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2458 thread, iType(i_begin), iType(i_end));
2459}
2460
2461KOKKOS_INLINE_FUNCTION
2462Impl::ThreadSingleStruct<Impl::HPXTeamMember> PerTeam(
2463 const Impl::HPXTeamMember &thread) {
2464 return Impl::ThreadSingleStruct<Impl::HPXTeamMember>(thread);
2465}
2466
2467KOKKOS_INLINE_FUNCTION
2468Impl::VectorSingleStruct<Impl::HPXTeamMember> PerThread(
2469 const Impl::HPXTeamMember &thread) {
2470 return Impl::VectorSingleStruct<Impl::HPXTeamMember>(thread);
2471}
2472
2478template <typename iType, class Lambda>
2479KOKKOS_INLINE_FUNCTION void parallel_for(
2480 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2481 &loop_boundaries,
2482 const Lambda &lambda) {
2483 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2484 i += loop_boundaries.increment)
2485 lambda(i);
2486}
2487
2494template <typename iType, class Lambda, typename ValueType>
2495KOKKOS_INLINE_FUNCTION void parallel_reduce(
2496 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2497 &loop_boundaries,
2498 const Lambda &lambda, ValueType &result) {
2499 result = ValueType();
2500 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2501 i += loop_boundaries.increment) {
2502 lambda(i, result);
2503 }
2504}
2505
2511template <typename iType, class Lambda>
2512KOKKOS_INLINE_FUNCTION void parallel_for(
2513 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2514 &loop_boundaries,
2515 const Lambda &lambda) {
2516#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2517#pragma ivdep
2518#endif
2519 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2520 i += loop_boundaries.increment) {
2521 lambda(i);
2522 }
2523}
2524
2531template <typename iType, class Lambda, typename ValueType>
2532KOKKOS_INLINE_FUNCTION void parallel_reduce(
2533 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2534 &loop_boundaries,
2535 const Lambda &lambda, ValueType &result) {
2536 result = ValueType();
2537#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2538#pragma ivdep
2539#endif
2540 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2541 i += loop_boundaries.increment) {
2542 lambda(i, result);
2543 }
2544}
2545
2546template <typename iType, class Lambda, typename ReducerType>
2547KOKKOS_INLINE_FUNCTION void parallel_reduce(
2548 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2549 &loop_boundaries,
2550 const Lambda &lambda, const ReducerType &reducer) {
2551 reducer.init(reducer.reference());
2552 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2553 i += loop_boundaries.increment) {
2554 lambda(i, reducer.reference());
2555 }
2556}
2557
2558template <typename iType, class Lambda, typename ReducerType>
2559KOKKOS_INLINE_FUNCTION void parallel_reduce(
2560 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2561 &loop_boundaries,
2562 const Lambda &lambda, const ReducerType &reducer) {
2563 reducer.init(reducer.reference());
2564#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2565#pragma ivdep
2566#endif
2567 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2568 i += loop_boundaries.increment) {
2569 lambda(i, reducer.reference());
2570 }
2571}
2572
2573template <typename iType, class FunctorType>
2574KOKKOS_INLINE_FUNCTION void parallel_scan(
2575 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember> const
2576 &loop_boundaries,
2577 const FunctorType &lambda) {
2578 using value_type = typename Kokkos::Impl::FunctorAnalysis<
2579 Kokkos::Impl::FunctorPatternInterface::SCAN, void,
2580 FunctorType>::value_type;
2581
2582 value_type scan_val = value_type();
2583
2584 // Intra-member scan
2585 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2586 i += loop_boundaries.increment) {
2587 lambda(i, scan_val, false);
2588 }
2589
2590 // 'scan_val' output is the exclusive prefix sum
2591 scan_val = loop_boundaries.thread.team_scan(scan_val);
2592
2593 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2594 i += loop_boundaries.increment) {
2595 lambda(i, scan_val, true);
2596 }
2597}
2598
2610template <typename iType, class FunctorType>
2611KOKKOS_INLINE_FUNCTION void parallel_scan(
2612 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2613 &loop_boundaries,
2614 const FunctorType &lambda) {
2615 using ValueTraits = Kokkos::Impl::FunctorValueTraits<FunctorType, void>;
2616 using value_type = typename ValueTraits::value_type;
2617
2618 value_type scan_val = value_type();
2619
2620#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2621#pragma ivdep
2622#endif
2623 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2624 i += loop_boundaries.increment) {
2625 lambda(i, scan_val, true);
2626 }
2627}
2628
2632template <typename iType, class FunctorType, typename ReducerType>
2633KOKKOS_INLINE_FUNCTION
2634 typename std::enable_if<Kokkos::is_reducer<ReducerType>::value>::type
2635 parallel_scan(const Impl::ThreadVectorRangeBoundariesStruct<
2636 iType, Impl::HPXTeamMember> &loop_boundaries,
2637 const FunctorType &lambda, const ReducerType &reducer) {
2638 typename ReducerType::value_type scan_val;
2639 reducer.init(scan_val);
2640
2641#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2642#pragma ivdep
2643#endif
2644 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2645 i += loop_boundaries.increment) {
2646 lambda(i, scan_val, true);
2647 }
2648}
2649
2650template <class FunctorType>
2651KOKKOS_INLINE_FUNCTION void single(
2652 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2653 const FunctorType &lambda) {
2654 lambda();
2655}
2656
2657template <class FunctorType>
2658KOKKOS_INLINE_FUNCTION void single(
2659 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2660 const FunctorType &lambda) {
2661 lambda();
2662}
2663
2664template <class FunctorType, class ValueType>
2665KOKKOS_INLINE_FUNCTION void single(
2666 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2667 const FunctorType &lambda, ValueType &val) {
2668 lambda(val);
2669}
2670
2671template <class FunctorType, class ValueType>
2672KOKKOS_INLINE_FUNCTION void single(
2673 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2674 const FunctorType &lambda, ValueType &val) {
2675 lambda(val);
2676}
2677
2678} // namespace Kokkos
2679
2680#include <HPX/Kokkos_HPX_Task.hpp>
2681
2682#endif /* #if defined( KOKKOS_ENABLE_HPX ) */
2683#endif /* #ifndef KOKKOS_HPX_HPP */
Declaration of various MemoryLayout options.
Declaration of parallel operators.
void parallel_for(const ExecPolicy &policy, const FunctorType &functor, const std::string &str="", typename std::enable_if< Kokkos::Impl::is_execution_policy< ExecPolicy >::value >::type *=nullptr)
Execute functor in parallel according to the execution policy.
KOKKOS_INLINE_FUNCTION size_type acquire() const
acquire value such that 0 <= value < size()
KOKKOS_INLINE_FUNCTION size_type size() const
upper bound for acquired values, i.e. 0 <= value < size()
UniqueToken(execution_space const &=execution_space())
create object size for concurrency on the given instance
KOKKOS_INLINE_FUNCTION void release(size_type) const
release a value acquired by generate
Memory management for host memory.
Execution policy for work over a range of an integral type.
Scratch memory space associated with an execution space.
View to an array of data.
ReturnType