100.00% Lines (7/7) 100.00% Functions (3/3)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
9   // 9   //
10   10  
11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP 11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12   #define BOOST_CAPY_EX_THREAD_POOL_HPP 12   #define BOOST_CAPY_EX_THREAD_POOL_HPP
13   13  
14   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <coroutine> 16   #include <coroutine>
17   #include <boost/capy/ex/execution_context.hpp> 17   #include <boost/capy/ex/execution_context.hpp>
18   #include <cstddef> 18   #include <cstddef>
19   #include <string_view> 19   #include <string_view>
20   20  
21   namespace boost { 21   namespace boost {
22   namespace capy { 22   namespace capy {
23   23  
24   /** A pool of threads for executing work concurrently. 24   /** A pool of threads for executing work concurrently.
25   25  
26   Use this when you need to run coroutines on multiple threads 26   Use this when you need to run coroutines on multiple threads
27   without the overhead of creating and destroying threads for 27   without the overhead of creating and destroying threads for
28   each task. Work items are distributed across the pool using 28   each task. Work items are distributed across the pool using
29   a shared queue. 29   a shared queue.
30   30  
31   @par Thread Safety 31   @par Thread Safety
32   Distinct objects: Safe. 32   Distinct objects: Safe.
33   Shared objects: Unsafe. 33   Shared objects: Unsafe.
34   34  
35   @par Example 35   @par Example
36   @code 36   @code
37   thread_pool pool(4); // 4 worker threads 37   thread_pool pool(4); // 4 worker threads
38   auto ex = pool.get_executor(); 38   auto ex = pool.get_executor();
39   ex.post(some_coroutine); 39   ex.post(some_coroutine);
40 - // pool destructor waits for all work to complete 40 + pool.join(); // wait for outstanding work to complete
  41 + // pool destructor stops the pool, discarding any pending work
41   @endcode 42   @endcode
42   */ 43   */
43   class BOOST_CAPY_DECL 44   class BOOST_CAPY_DECL
44   thread_pool 45   thread_pool
45   : public execution_context 46   : public execution_context
46   { 47   {
47   class impl; 48   class impl;
48   impl* impl_; 49   impl* impl_;
49   50  
50   public: 51   public:
51   class executor_type; 52   class executor_type;
52   53  
53   /** Destroy the thread pool. 54   /** Destroy the thread pool.
54   55  
55   Signals all worker threads to stop, waits for them to 56   Signals all worker threads to stop, waits for them to
56   finish, and destroys any pending work items. 57   finish, and destroys any pending work items.
57   */ 58   */
58   ~thread_pool(); 59   ~thread_pool();
59   60  
60   /** Construct a thread pool. 61   /** Construct a thread pool.
61   62  
62   Creates a pool with the specified number of worker threads. 63   Creates a pool with the specified number of worker threads.
63   If `num_threads` is zero, the number of threads is set to 64   If `num_threads` is zero, the number of threads is set to
64   the hardware concurrency, or one if that cannot be determined. 65   the hardware concurrency, or one if that cannot be determined.
65   66  
66   @param num_threads The number of worker threads, or zero 67   @param num_threads The number of worker threads, or zero
67   for automatic selection. 68   for automatic selection.
68   69  
69   @param thread_name_prefix The prefix for worker thread names. 70   @param thread_name_prefix The prefix for worker thread names.
70   Thread names appear as "{prefix}0", "{prefix}1", etc. 71   Thread names appear as "{prefix}0", "{prefix}1", etc.
71   The prefix is truncated to 12 characters. Defaults to 72   The prefix is truncated to 12 characters. Defaults to
72   "capy-pool-". 73   "capy-pool-".
73   */ 74   */
74   explicit 75   explicit
75   thread_pool( 76   thread_pool(
76   std::size_t num_threads = 0, 77   std::size_t num_threads = 0,
77   std::string_view thread_name_prefix = "capy-pool-"); 78   std::string_view thread_name_prefix = "capy-pool-");
78   79  
79   thread_pool(thread_pool const&) = delete; 80   thread_pool(thread_pool const&) = delete;
80   thread_pool& operator=(thread_pool const&) = delete; 81   thread_pool& operator=(thread_pool const&) = delete;
81   82  
82   /** Wait for all outstanding work to complete. 83   /** Wait for all outstanding work to complete.
83   84  
84   Releases the internal work guard, then blocks the calling 85   Releases the internal work guard, then blocks the calling
85   thread until all outstanding work tracked by 86   thread until all outstanding work tracked by
86   @ref executor_type::on_work_started and 87   @ref executor_type::on_work_started and
87   @ref executor_type::on_work_finished completes. After all 88   @ref executor_type::on_work_finished completes. After all
88   work finishes, joins the worker threads. 89   work finishes, joins the worker threads.
89   90  
90   If @ref stop is called while `join()` is blocking, the 91   If @ref stop is called while `join()` is blocking, the
91   pool stops without waiting for remaining work to 92   pool stops without waiting for remaining work to
92   complete. Worker threads finish their current item and 93   complete. Worker threads finish their current item and
93   exit; `join()` still waits for all threads to be joined 94   exit; `join()` still waits for all threads to be joined
94   before returning. 95   before returning.
95   96  
96   This function is idempotent. The first call performs the 97   This function is idempotent. The first call performs the
97   join; subsequent calls return immediately. 98   join; subsequent calls return immediately.
98   99  
99   @par Preconditions 100   @par Preconditions
100   Must not be called from a thread in this pool (undefined 101   Must not be called from a thread in this pool (undefined
101   behavior). 102   behavior).
102   103  
103   @par Postconditions 104   @par Postconditions
104   All worker threads have been joined. The pool cannot be 105   All worker threads have been joined. The pool cannot be
105   reused. 106   reused.
106   107  
107   @par Thread Safety 108   @par Thread Safety
108   May be called from any thread not in this pool. 109   May be called from any thread not in this pool.
109   */ 110   */
110   void 111   void
111   join() noexcept; 112   join() noexcept;
112   113  
113   /** Request all worker threads to stop. 114   /** Request all worker threads to stop.
114   115  
115   Signals all threads to exit after finishing their current 116   Signals all threads to exit after finishing their current
116   work item. Queued work that has not started is abandoned. 117   work item. Queued work that has not started is abandoned.
117   Does not wait for threads to exit. 118   Does not wait for threads to exit.
118   119  
119   If @ref join is blocking on another thread, calling 120   If @ref join is blocking on another thread, calling
120   `stop()` causes it to stop waiting for outstanding 121   `stop()` causes it to stop waiting for outstanding
121   work. The `join()` call still waits for worker threads 122   work. The `join()` call still waits for worker threads
122   to finish their current item and exit before returning. 123   to finish their current item and exit before returning.
123   */ 124   */
124   void 125   void
125   stop() noexcept; 126   stop() noexcept;
126   127  
127   /** Return an executor for this thread pool. 128   /** Return an executor for this thread pool.
128   129  
129   @return An executor associated with this thread pool. 130   @return An executor associated with this thread pool.
130   */ 131   */
131   executor_type 132   executor_type
132   get_executor() const noexcept; 133   get_executor() const noexcept;
133   }; 134   };
134   135  
135   /** An executor that submits work to a thread_pool. 136   /** An executor that submits work to a thread_pool.
136   137  
137   Executors are lightweight handles that can be copied and stored. 138   Executors are lightweight handles that can be copied and stored.
138   All copies refer to the same underlying thread pool. 139   All copies refer to the same underlying thread pool.
139   140  
140   @par Thread Safety 141   @par Thread Safety
141   Distinct objects: Safe. 142   Distinct objects: Safe.
142   Shared objects: Safe. 143   Shared objects: Safe.
143   */ 144   */
144   class thread_pool::executor_type 145   class thread_pool::executor_type
145   { 146   {
146   friend class thread_pool; 147   friend class thread_pool;
147   148  
148   thread_pool* pool_ = nullptr; 149   thread_pool* pool_ = nullptr;
149   150  
150   explicit 151   explicit
HITCBC 151   11582 executor_type(thread_pool& pool) noexcept 152   11582 executor_type(thread_pool& pool) noexcept
HITCBC 152   11582 : pool_(&pool) 153   11582 : pool_(&pool)
153   { 154   {
HITCBC 154   11582 } 155   11582 }
155   156  
156   public: 157   public:
157   /** Construct a default null executor. 158   /** Construct a default null executor.
158   159  
159   The resulting executor is not associated with any pool. 160   The resulting executor is not associated with any pool.
160   `context()`, `dispatch()`, and `post()` require the 161   `context()`, `dispatch()`, and `post()` require the
161   executor to be associated with a pool before use. 162   executor to be associated with a pool before use.
162   */ 163   */
163   executor_type() = default; 164   executor_type() = default;
164   165  
165   /// Return the underlying thread pool. 166   /// Return the underlying thread pool.
166   thread_pool& 167   thread_pool&
HITCBC 167   11817 context() const noexcept 168   11817 context() const noexcept
168   { 169   {
HITCBC 169   11817 return *pool_; 170   11817 return *pool_;
170   } 171   }
171   172  
172   /** Notify that work has started. 173   /** Notify that work has started.
173   174  
174   Increments the outstanding work count. Must be paired 175   Increments the outstanding work count. Must be paired
175   with a subsequent call to @ref on_work_finished. 176   with a subsequent call to @ref on_work_finished.
176   177  
177   @see on_work_finished, work_guard 178   @see on_work_finished, work_guard
178   */ 179   */
179   BOOST_CAPY_DECL 180   BOOST_CAPY_DECL
180   void 181   void
181   on_work_started() const noexcept; 182   on_work_started() const noexcept;
182   183  
183   /** Notify that work has finished. 184   /** Notify that work has finished.
184   185  
185   Decrements the outstanding work count. When the count 186   Decrements the outstanding work count. When the count
186   reaches zero after @ref thread_pool::join has been called, 187   reaches zero after @ref thread_pool::join has been called,
187   the pool's worker threads are signaled to stop. 188   the pool's worker threads are signaled to stop.
188   189  
189   @pre A preceding call to @ref on_work_started was made. 190   @pre A preceding call to @ref on_work_started was made.
190   191  
191   @see on_work_started, work_guard 192   @see on_work_started, work_guard
192   */ 193   */
193   BOOST_CAPY_DECL 194   BOOST_CAPY_DECL
194   void 195   void
195   on_work_finished() const noexcept; 196   on_work_finished() const noexcept;
196   197  
197   /** Dispatch a continuation for execution. 198   /** Dispatch a continuation for execution.
198   199  
199   If the calling thread is a worker of this pool, returns 200   If the calling thread is a worker of this pool, returns
200   `c.h` for symmetric transfer so the caller can resume the 201   `c.h` for symmetric transfer so the caller can resume the
201   continuation inline. Otherwise, posts the continuation to 202   continuation inline. Otherwise, posts the continuation to
202   the pool for execution on a worker thread and returns 203   the pool for execution on a worker thread and returns
203   `std::noop_coroutine()`. 204   `std::noop_coroutine()`.
204   205  
205   @param c The continuation to execute. On the post path, 206   @param c The continuation to execute. On the post path,
206   must remain at a stable address until dequeued 207   must remain at a stable address until dequeued
207   and resumed. 208   and resumed.
208   209  
209   @return `c.h` when the calling thread is a pool worker; 210   @return `c.h` when the calling thread is a pool worker;
210   `std::noop_coroutine()` otherwise. 211   `std::noop_coroutine()` otherwise.
211   */ 212   */
212   BOOST_CAPY_DECL 213   BOOST_CAPY_DECL
213   std::coroutine_handle<> 214   std::coroutine_handle<>
214   dispatch(continuation& c) const; 215   dispatch(continuation& c) const;
215   216  
216   /** Post a continuation to the thread pool. 217   /** Post a continuation to the thread pool.
217   218  
218   The continuation will be resumed on one of the pool's 219   The continuation will be resumed on one of the pool's
219   worker threads. The continuation must remain at a stable 220   worker threads. The continuation must remain at a stable
220   address until it is dequeued and resumed. 221   address until it is dequeued and resumed.
221   222  
222   @param c The continuation to execute. 223   @param c The continuation to execute.
223   */ 224   */
224   BOOST_CAPY_DECL 225   BOOST_CAPY_DECL
225   void 226   void
226   post(continuation& c) const; 227   post(continuation& c) const;
227   228  
228   /// Return true if two executors refer to the same thread pool. 229   /// Return true if two executors refer to the same thread pool.
229   bool 230   bool
HITCBC 230   13 operator==(executor_type const& other) const noexcept 231   13 operator==(executor_type const& other) const noexcept
231   { 232   {
HITCBC 232   13 return pool_ == other.pool_; 233   13 return pool_ == other.pool_;
233   } 234   }
234   }; 235   };
235   236  
236   } // capy 237   } // capy
237   } // boost 238   } // boost
238   239  
239   #endif 240   #endif