Kokkos Core Kernels Package Version of the Day
Kokkos_WorkGraphPolicy.hpp
1/*
2//@HEADER
3// ************************************************************************
4//
5// Kokkos v. 3.0
6// Copyright (2020) National Technology & Engineering
7// Solutions of Sandia, LLC (NTESS).
8//
9// Under the terms of Contract DE-NA0003525 with NTESS,
10// the U.S. Government retains certain rights in this software.
11//
12// Redistribution and use in source and binary forms, with or without
13// modification, are permitted provided that the following conditions are
14// met:
15//
16// 1. Redistributions of source code must retain the above copyright
17// notice, this list of conditions and the following disclaimer.
18//
19// 2. Redistributions in binary form must reproduce the above copyright
20// notice, this list of conditions and the following disclaimer in the
21// documentation and/or other materials provided with the distribution.
22//
23// 3. Neither the name of the Corporation nor the names of the
24// contributors may be used to endorse or promote products derived from
25// this software without specific prior written permission.
26//
27// THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY
28// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE
31// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38//
39// Questions? Contact Christian R. Trott (crtrott@sandia.gov)
40//
41// ************************************************************************
42//@HEADER
43*/
44
45#ifndef KOKKOS_WORKGRAPHPOLICY_HPP
46#define KOKKOS_WORKGRAPHPOLICY_HPP
47
48#include <impl/Kokkos_AnalyzePolicy.hpp>
49#include <Kokkos_Crs.hpp>
50
51namespace Kokkos {
52namespace Impl {
53
54template <class functor_type, class execution_space, class... policy_args>
55class WorkGraphExec;
56
57}
58} // namespace Kokkos
59
60namespace Kokkos {
61
62template <class... Properties>
63class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
64 public:
65 using execution_policy = WorkGraphPolicy<Properties...>;
66 using self_type = WorkGraphPolicy<Properties...>;
67 using traits = Kokkos::Impl::PolicyTraits<Properties...>;
68 using index_type = typename traits::index_type;
69 using member_type = index_type;
70 using execution_space = typename traits::execution_space;
71 using memory_space = typename execution_space::memory_space;
73
74 enum : std::int32_t {
75 END_TOKEN = -1,
76 BEGIN_TOKEN = -2,
77 COMPLETED_TOKEN = -3
78 };
79
80 private:
82
83 // Let N = m_graph.numRows(), the total work
84 // m_queue[ 0 .. N-1] = the ready queue
85 // m_queue[ N .. 2*N-1] = the waiting queue counts
86 // m_queue[2*N .. 2*N+2] = the ready queue hints
87
88 graph_type const m_graph;
89 ints_type m_queue;
90
91 KOKKOS_INLINE_FUNCTION
92 void push_work(const std::int32_t w) const noexcept {
93 const std::int32_t N = m_graph.numRows();
94
95 std::int32_t volatile* const ready_queue = &m_queue[0];
96 std::int32_t volatile* const end_hint = &m_queue[2 * N + 1];
97
98 // Push work to end of queue
99 const std::int32_t j = atomic_fetch_add(end_hint, 1);
100
101 if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
102 // ERROR: past the end of queue or did not replace END_TOKEN
103 Kokkos::abort("WorkGraphPolicy push_work error");
104 }
105
106 memory_fence();
107 }
108
109 public:
124 KOKKOS_INLINE_FUNCTION
125 std::int32_t pop_work() const noexcept {
126 const std::int32_t N = m_graph.numRows();
127
128 std::int32_t volatile* const ready_queue = &m_queue[0];
129 std::int32_t volatile* const begin_hint = &m_queue[2 * N];
130
131 // begin hint is guaranteed to be less than or equal to
132 // actual begin location in the queue.
133
134 for (std::int32_t i = *begin_hint; i < N; ++i) {
135 const std::int32_t w = ready_queue[i];
136
137 if (w == END_TOKEN) {
138 return END_TOKEN;
139 }
140
141 if ((w != BEGIN_TOKEN) &&
142 (w == atomic_compare_exchange(ready_queue + i, w,
143 (std::int32_t)BEGIN_TOKEN))) {
144 // Attempt to claim ready work index succeeded,
145 // update the hint and return work index
146 atomic_increment(begin_hint);
147 return w;
148 }
149 // arrive here when ready_queue[i] == BEGIN_TOKEN
150 }
151
152 return COMPLETED_TOKEN;
153 }
154
155 KOKKOS_INLINE_FUNCTION
156 void completed_work(std::int32_t w) const noexcept {
157 Kokkos::memory_fence();
158
159 // Make sure the completed work function's memory accesses are flushed.
160
161 const std::int32_t N = m_graph.numRows();
162
163 std::int32_t volatile* const count_queue = &m_queue[N];
164
165 const std::int32_t B = m_graph.row_map(w);
166 const std::int32_t E = m_graph.row_map(w + 1);
167
168 for (std::int32_t i = B; i < E; ++i) {
169 const std::int32_t j = m_graph.entries(i);
170 if (1 == atomic_fetch_add(count_queue + j, -1)) {
171 push_work(j);
172 }
173 }
174 }
175
176 struct TagInit {};
177 struct TagCount {};
178 struct TagReady {};
179
186 KOKKOS_INLINE_FUNCTION
187 void operator()(const TagInit, int i) const noexcept {
188 m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
189 }
190
191 KOKKOS_INLINE_FUNCTION
192 void operator()(const TagCount, int i) const noexcept {
193 std::int32_t volatile* const count_queue = &m_queue[m_graph.numRows()];
194
195 atomic_increment(count_queue + m_graph.entries[i]);
196 }
197
198 KOKKOS_INLINE_FUNCTION
199 void operator()(const TagReady, int w) const noexcept {
200 std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
201
202 if (0 == count_queue[w]) push_work(w);
203 }
204
205 execution_space space() const { return execution_space(); }
206
207 WorkGraphPolicy(const graph_type& arg_graph)
208 : m_graph(arg_graph),
209 m_queue(view_alloc("queue", WithoutInitializing),
210 arg_graph.numRows() * 2 + 2) {
211 { // Initialize
212 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
214 const closure_type closure(*this, policy_type(0, m_queue.size()));
215 closure.execute();
216 execution_space().fence();
217 }
218
219 { // execute-after counts
220 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
222 const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
223 closure.execute();
224 execution_space().fence();
225 }
226
227 { // Scheduling ready tasks
228 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
230 const closure_type closure(*this, policy_type(0, m_graph.numRows()));
231 closure.execute();
232 execution_space().fence();
233 }
234 }
235};
236
237} // namespace Kokkos
238
239#ifdef KOKKOS_ENABLE_SERIAL
240#include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
241#endif
242
243#ifdef KOKKOS_ENABLE_OPENMP
244#include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
245#endif
246
247#ifdef KOKKOS_ENABLE_CUDA
248#include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
249#endif
250
251#ifdef KOKKOS_ENABLE_HIP
252#include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp"
253#endif
254
255#ifdef KOKKOS_ENABLE_THREADS
256#include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
257#endif
258
259#ifdef KOKKOS_ENABLE_HPX
260#include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
261#endif
262
263#endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Compressed row storage array.
Definition: Kokkos_Crs.hpp:86
Implementation of the ParallelFor operator that has a partial specialization for the device.