TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12 : #define BOOST_CAPY_EX_THREAD_POOL_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/execution_context.hpp>
18 : #include <cstddef>
19 : #include <string_view>
20 :
21 : namespace boost {
22 : namespace capy {
23 :
24 : /** A pool of threads for executing work concurrently.
25 :
26 : Use this when you need to run coroutines on multiple threads
27 : without the overhead of creating and destroying threads for
28 : each task. Work items are distributed across the pool using
29 : a shared queue.
30 :
31 : @par Thread Safety
32 : Distinct objects: Safe.
33 : Shared objects: Unsafe.
34 :
35 : @par Example
36 : @code
37 : thread_pool pool(4); // 4 worker threads
38 : auto ex = pool.get_executor();
39 : ex.post(some_coroutine);
40 : pool.join(); // wait for outstanding work to complete
41 : // pool destructor stops the pool, discarding any pending work
42 : @endcode
43 : */
44 : class BOOST_CAPY_DECL
45 : thread_pool
46 : : public execution_context
47 : {
48 : class impl;
49 : impl* impl_;
50 :
51 : public:
52 : class executor_type;
53 :
54 : /** Destroy the thread pool.
55 :
56 : Signals all worker threads to stop, waits for them to
57 : finish, and destroys any pending work items.
58 : */
59 : ~thread_pool();
60 :
61 : /** Construct a thread pool.
62 :
63 : Creates a pool with the specified number of worker threads.
64 : If `num_threads` is zero, the number of threads is set to
65 : the hardware concurrency, or one if that cannot be determined.
66 :
67 : @param num_threads The number of worker threads, or zero
68 : for automatic selection.
69 :
70 : @param thread_name_prefix The prefix for worker thread names.
71 : Thread names appear as "{prefix}0", "{prefix}1", etc.
72 : The prefix is truncated to 12 characters. Defaults to
73 : "capy-pool-".
74 : */
75 : explicit
76 : thread_pool(
77 : std::size_t num_threads = 0,
78 : std::string_view thread_name_prefix = "capy-pool-");
79 :
80 : thread_pool(thread_pool const&) = delete;
81 : thread_pool& operator=(thread_pool const&) = delete;
82 :
83 : /** Wait for all outstanding work to complete.
84 :
85 : Releases the internal work guard, then blocks the calling
86 : thread until all outstanding work tracked by
87 : @ref executor_type::on_work_started and
88 : @ref executor_type::on_work_finished completes. After all
89 : work finishes, joins the worker threads.
90 :
91 : If @ref stop is called while `join()` is blocking, the
92 : pool stops without waiting for remaining work to
93 : complete. Worker threads finish their current item and
94 : exit; `join()` still waits for all threads to be joined
95 : before returning.
96 :
97 : This function is idempotent. The first call performs the
98 : join; subsequent calls return immediately.
99 :
100 : @par Preconditions
101 : Must not be called from a thread in this pool (undefined
102 : behavior).
103 :
104 : @par Postconditions
105 : All worker threads have been joined. The pool cannot be
106 : reused.
107 :
108 : @par Thread Safety
109 : May be called from any thread not in this pool.
110 : */
111 : void
112 : join() noexcept;
113 :
114 : /** Request all worker threads to stop.
115 :
116 : Signals all threads to exit after finishing their current
117 : work item. Queued work that has not started is abandoned.
118 : Does not wait for threads to exit.
119 :
120 : If @ref join is blocking on another thread, calling
121 : `stop()` causes it to stop waiting for outstanding
122 : work. The `join()` call still waits for worker threads
123 : to finish their current item and exit before returning.
124 : */
125 : void
126 : stop() noexcept;
127 :
128 : /** Return an executor for this thread pool.
129 :
130 : @return An executor associated with this thread pool.
131 : */
132 : executor_type
133 : get_executor() const noexcept;
134 : };
135 :
136 : /** An executor that submits work to a thread_pool.
137 :
138 : Executors are lightweight handles that can be copied and stored.
139 : All copies refer to the same underlying thread pool.
140 :
141 : @par Thread Safety
142 : Distinct objects: Safe.
143 : Shared objects: Safe.
144 : */
145 : class thread_pool::executor_type
146 : {
147 : friend class thread_pool;
148 :
149 : thread_pool* pool_ = nullptr;
150 :
151 : explicit
152 HIT 11582 : executor_type(thread_pool& pool) noexcept
153 11582 : : pool_(&pool)
154 : {
155 11582 : }
156 :
157 : public:
158 : /** Construct a default null executor.
159 :
160 : The resulting executor is not associated with any pool.
161 : `context()`, `dispatch()`, and `post()` require the
162 : executor to be associated with a pool before use.
163 : */
164 : executor_type() = default;
165 :
166 : /// Return the underlying thread pool.
167 : thread_pool&
168 11817 : context() const noexcept
169 : {
170 11817 : return *pool_;
171 : }
172 :
173 : /** Notify that work has started.
174 :
175 : Increments the outstanding work count. Must be paired
176 : with a subsequent call to @ref on_work_finished.
177 :
178 : @see on_work_finished, work_guard
179 : */
180 : BOOST_CAPY_DECL
181 : void
182 : on_work_started() const noexcept;
183 :
184 : /** Notify that work has finished.
185 :
186 : Decrements the outstanding work count. When the count
187 : reaches zero after @ref thread_pool::join has been called,
188 : the pool's worker threads are signaled to stop.
189 :
190 : @pre A preceding call to @ref on_work_started was made.
191 :
192 : @see on_work_started, work_guard
193 : */
194 : BOOST_CAPY_DECL
195 : void
196 : on_work_finished() const noexcept;
197 :
198 : /** Dispatch a continuation for execution.
199 :
200 : If the calling thread is a worker of this pool, returns
201 : `c.h` for symmetric transfer so the caller can resume the
202 : continuation inline. Otherwise, posts the continuation to
203 : the pool for execution on a worker thread and returns
204 : `std::noop_coroutine()`.
205 :
206 : @param c The continuation to execute. On the post path,
207 : must remain at a stable address until dequeued
208 : and resumed.
209 :
210 : @return `c.h` when the calling thread is a pool worker;
211 : `std::noop_coroutine()` otherwise.
212 : */
213 : BOOST_CAPY_DECL
214 : std::coroutine_handle<>
215 : dispatch(continuation& c) const;
216 :
217 : /** Post a continuation to the thread pool.
218 :
219 : The continuation will be resumed on one of the pool's
220 : worker threads. The continuation must remain at a stable
221 : address until it is dequeued and resumed.
222 :
223 : @param c The continuation to execute.
224 : */
225 : BOOST_CAPY_DECL
226 : void
227 : post(continuation& c) const;
228 :
229 : /// Return true if two executors refer to the same thread pool.
230 : bool
231 13 : operator==(executor_type const& other) const noexcept
232 : {
233 13 : return pool_ == other.pool_;
234 : }
235 : };
236 :
237 : } // capy
238 : } // boost
239 :
240 : #endif
|