Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
task_stream_extended.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef _TBB_task_stream_extended_H
18#define _TBB_task_stream_extended_H
19
26
27
28#if _TBB_task_stream_H
29#error Either task_stream.h or this file can be included at the same time.
30#endif
31
32#if !__TBB_CPF_BUILD
33#error This code bears a preview status until it proves its usefulness/performance suitability.
34#endif
35
36#include "tbb/tbb_stddef.h"
37#include <deque>
38#include <climits>
39#include "tbb/atomic.h" // for __TBB_Atomic*
40#include "tbb/spin_mutex.h"
41#include "tbb/tbb_allocator.h"
42#include "scheduler_common.h"
43#include "tbb_misc.h" // for FastRandom
44
45namespace tbb {
46namespace internal {
47
49
51template< typename T, typename mutex_t >
52struct queue_and_mutex {
53 typedef std::deque< T, tbb_allocator<T> > queue_base_t;
54
57
60};
61
62typedef uintptr_t population_t;
63const population_t one = 1;
64
65inline void set_one_bit( population_t& dest, int pos ) {
66 __TBB_ASSERT( pos>=0, NULL );
67 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
68 __TBB_AtomicOR( &dest, one<<pos );
69}
70
71inline void clear_one_bit( population_t& dest, int pos ) {
72 __TBB_ASSERT( pos>=0, NULL );
73 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
74 __TBB_AtomicAND( &dest, ~(one<<pos) );
75}
76
77inline bool is_bit_set( population_t val, int pos ) {
78 __TBB_ASSERT( pos>=0, NULL );
79 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
80 return (val & (one<<pos)) != 0;
81}
82
84#if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
86#else
88#endif
89{
90 random_lane_selector( FastRandom& random ) : my_random( random ) {}
91 unsigned operator()( unsigned out_of ) const {
92 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
93 return my_random.get() & (out_of-1);
94 }
95private:
97};
98
100#if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
102#else
103 no_copy
104#endif
105{
106 unsigned& my_previous;
107 lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
108};
109
111 subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
112 unsigned operator()( unsigned out_of ) const {
113 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
114 return (++my_previous &= out_of-1);
115 }
116};
117
119 preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
120 unsigned operator()( unsigned out_of ) const {
121 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
122 return (--my_previous &= (out_of-1));
123 }
124};
125
127protected:
129};
130
132
135template<task_stream_accessor_type accessor>
137protected:
140 task* result = queue.front();
141 queue.pop_front();
142 return result;
143 }
144};
145
146template<>
148protected:
150 task* result = NULL;
151 do {
152 result = queue.back();
153 queue.pop_back();
154 } while( !result && !queue.empty() );
155 return result;
156 }
157};
158
160template<int Levels, task_stream_accessor_type accessor>
161class task_stream : public task_stream_accessor< accessor > {
163 population_t population[Levels];
164 padded<lane_t>* lanes[Levels];
165 unsigned N;
166
167public:
169 for(int level = 0; level < Levels; level++) {
170 population[level] = 0;
171 lanes[level] = NULL;
172 }
173 }
174
175 void initialize( unsigned n_lanes ) {
176 const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
177
178 N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
179 __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
180 __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
181 for(int level = 0; level < Levels; level++) {
182 lanes[level] = new padded<lane_t>[N];
183 __TBB_ASSERT( !population[level], NULL );
184 }
185 }
186
188 for(int level = 0; level < Levels; level++)
189 if (lanes[level]) delete[] lanes[level];
190 }
191
193 bool try_push( task* source, int level, unsigned lane_idx ) {
194 __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
196 if( lock.try_acquire( lanes[level][lane_idx].my_mutex ) ) {
197 lanes[level][lane_idx].my_queue.push_back( source );
198 set_one_bit( population[level], lane_idx ); // TODO: avoid atomic op if the bit is already set
199 return true;
200 }
201 return false;
202 }
203
205 template<typename lane_selector_t>
206 void push( task* source, int level, const lane_selector_t& next_lane ) {
207 bool succeed = false;
208 unsigned lane = 0;
209 do {
210 lane = next_lane( /*out_of=*/N );
211 __TBB_ASSERT( lane < N, "Incorrect lane index." );
212 } while( ! (succeed = try_push( source, level, lane )) );
213 }
214
216 task* try_pop( int level, unsigned lane_idx ) {
217 __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
218 if( !is_bit_set( population[level], lane_idx ) )
219 return NULL;
220 task* result = NULL;
221 lane_t& lane = lanes[level][lane_idx];
223 if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
224 result = this->get_item( lane.my_queue );
225 if( lane.my_queue.empty() )
226 clear_one_bit( population[level], lane_idx );
227 }
228 return result;
229 }
230
233 template<typename lane_selector_t>
234 task* pop( int level, const lane_selector_t& next_lane ) {
235 task* popped = NULL;
236 unsigned lane = 0;
237 do {
238 lane = next_lane( /*out_of=*/N );
239 __TBB_ASSERT( lane < N, "Incorrect lane index." );
240 } while( !empty( level ) && !(popped = try_pop( level, lane )) );
241 return popped;
242 }
243
244 // TODO: unify '*_specific' logic with 'pop' methods above
246 __TBB_ASSERT( !queue.empty(), NULL );
247 // TODO: add a worst-case performance test and consider an alternative container with better
248 // performance for isolation search.
249 typename lane_t::queue_base_t::iterator curr = queue.end();
250 do {
251 // TODO: consider logic from get_task to simplify the code.
252 task* result = *--curr;
253 if( result __TBB_ISOLATION_EXPR( && result->prefix().isolation == isolation ) ) {
254 if( queue.end() - curr == 1 )
255 queue.pop_back(); // a little of housekeeping along the way
256 else
257 *curr = 0; // grabbing task with the same isolation
258 // TODO: move one of the container's ends instead if the task has been found there
259 return result;
260 }
261 } while( curr != queue.begin() );
262 return NULL;
263 }
264
266 task* pop_specific( int level, __TBB_ISOLATION_ARG(unsigned& last_used_lane, isolation_tag isolation) ) {
267 task* result = NULL;
268 // Lane selection is round-robin in backward direction.
269 unsigned idx = last_used_lane & (N-1);
270 do {
271 if( is_bit_set( population[level], idx ) ) {
272 lane_t& lane = lanes[level][idx];
274 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
275 result = look_specific( __TBB_ISOLATION_ARG(lane.my_queue, isolation) );
276 if( lane.my_queue.empty() )
277 clear_one_bit( population[level], idx );
278 if( result )
279 break;
280 }
281 }
282 idx=(idx-1)&(N-1);
283 } while( !empty(level) && idx != last_used_lane );
284 last_used_lane = idx;
285 return result;
286 }
287
289 bool empty(int level) {
290 return !population[level];
291 }
292
294
296 intptr_t drain() {
297 intptr_t result = 0;
298 for(int level = 0; level < Levels; level++)
299 for(unsigned i=0; i<N; ++i) {
300 lane_t& lane = lanes[level][i];
302 for(typename lane_t::queue_base_t::iterator it=lane.my_queue.begin();
303 it!=lane.my_queue.end(); ++it, ++result)
304 {
305 __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
306 task* t = *it;
307 tbb::task::destroy(*t);
308 }
309 lane.my_queue.clear();
310 clear_one_bit( population[level], i );
311 }
312 return result;
313 }
314}; // task_stream
315
316} // namespace internal
317} // namespace tbb
318
319#endif /* _TBB_task_stream_extended_H */
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:878
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:888
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
CRITICAL_SECTION mutex_t
#define __TBB_ISOLATION_EXPR(isolation)
#define __TBB_ISOLATION_ARG(arg1, isolation)
The graph class.
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:143
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
uintptr_t population_t
Definition: task_stream.h:46
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49
const population_t one
Definition: task_stream.h:47
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
isolation_tag isolation
The tag used for task isolation.
Definition: task.h:220
Base class for user-defined tasks.
Definition: task.h:615
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:1002
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:261
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:36
std::deque< T, tbb_allocator< T > > queue_base_t
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:69
void push(task *source, int level, const lane_selector_t &next_lane)
Push a task into a lane. Lane selection is performed by passed functor.
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
task * pop_specific(int level, __TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation))
Try finding and popping a related task.
task * try_pop(int level, unsigned lane_idx)
Returns pointer to task on successful pop, otherwise - NULL.
void initialize(unsigned n_lanes)
population_t population[Levels]
Definition: task_stream.h:71
task * pop(int level, const lane_selector_t &next_lane)
task_stream_accessor< accessor >::lane_t lane_t
bool try_push(task *source, int level, unsigned lane_idx)
Returns true on successful push, otherwise - false.
task * look_specific(__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
unsigned operator()(unsigned out_of) const
unsigned operator()(unsigned out_of) const
unsigned operator()(unsigned out_of) const
queue_and_mutex< task *, spin_mutex > lane_t
task * get_item(lane_t::queue_base_t &queue)
A fast random number generator.
Definition: tbb_misc.h:135
unsigned short get()
Get a random number.
Definition: tbb_misc.h:146

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.