/*
Copyright (c) 2005-2020 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef __TBB_flow_graph_H
#define __TBB_flow_graph_H
#include <atomic>
#include <memory>
#include <type_traits>
#include "detail/_config.h"
#include "detail/_namespace_injection.h"
#include "spin_mutex.h"
#include "null_mutex.h"
#include "spin_rw_mutex.h"
#include "null_rw_mutex.h"
#include "detail/_pipeline_filters.h"
#include "detail/_task.h"
#include "detail/_small_object_pool.h"
#include "cache_aligned_allocator.h"
#include "detail/_exception.h"
#include "detail/_template_helpers.h"
#include "detail/_aggregator.h"
#include "detail/_allocator_traits.h"
#include "profiling.h"
#include "task_arena.h"
#if TBB_USE_PROFILING_TOOLS && ( __linux__ || __APPLE__ )
#if __INTEL_COMPILER
// Disabled warning "routine is both inline and noinline"
#pragma warning (push)
#pragma warning( disable: 2196 )
#endif
#define __TBB_NOINLINE_SYM __attribute__((noinline))
#else
#define __TBB_NOINLINE_SYM
#endif
#include <tuple>
#include <list>
#include <queue>
/** @file
\brief The graph related classes and functions
There are some applications that best express dependencies as messages
passed between nodes in a graph. These messages may contain data or
simply act as signals that a predecessors has completed. The graph
class and its associated node classes can be used to express such
applications.
*/
namespace tbb {
namespace detail {
namespace d1 {
//! An enumeration the provides the two most common concurrency levels: unlimited and serial
enum concurrency { unlimited = 0, serial = 1 };
//! A generic null type
struct null_type {};
//! An empty class used for messages that mean "I'm done"
class continue_msg {};
//! Forward declaration section
template< typename T > class sender;
template< typename T > class receiver;
class continue_receiver;
template< typename T, typename U > class limiter_node; // needed for resetting decrementer
template<typename T, typename M> class successor_cache;
template<typename T, typename M> class broadcast_cache;
template<typename T, typename M> class round_robin_cache;
template<typename T, typename M> class predecessor_cache;
template<typename T, typename M> class reservable_predecessor_cache;
#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
namespace order {
struct following;
struct preceding;
}
template<typename Order, typename... Args> struct node_set;
#endif
} // namespace d1
} // namespace detail
} // namespace tbb
//! The graph class
#include "detail/_flow_graph_impl.h"
namespace tbb {
namespace detail {
namespace d1 {
static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
if (second->priority > first->priority)
return std::make_pair(second, first);
return std::make_pair(first, second);
}
// submit task if necessary. Returns the non-enqueued task if there is one.
static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
// if no RHS task, don't change left.
if (right == NULL) return left;
// right != NULL
if (left == NULL) return right;
if (left == SUCCESSFULLY_ENQUEUED) return right;
// left contains a task
if (right != SUCCESSFULLY_ENQUEUED) {
// both are valid tasks
auto tasks_pair = order_tasks(left, right);
spawn_in_graph_arena(g, *tasks_pair.first);
return tasks_pair.second;
}
return left;
}
//! Pure virtual template class that defines a sender of messages of type T
template< typename T >
class sender {
public:
virtual ~sender() {}
//! Request an item from the sender
virtual bool try_get( T & ) { return false; }
//! Reserves an item in the sender
virtual bool try_reserve( T & ) { return false; }
//! Releases the reserved item
virtual bool try_release( ) { return false; }
//! Consumes the reserved item
virtual bool try_consume( ) { return false; }
protected:
//! The output type of this sender
typedef T output_type;
//! The successor type for this node
typedef receiver<T> successor_type;
//! Add a new successor to this node
virtual bool register_successor( successor_type &r ) = 0;
//! Removes a successor from this node
virtual bool remove_successor( successor_type &r ) = 0;
template<typename C>
friend bool register_successor(sender<C>& s, receiver<C>& r);
template<typename C>
friend bool remove_successor (sender<C>& s, receiver<C>& r);
}; // class sender<T>
template<typename C>
bool register_successor(sender<C>& s, receiver<C>& r) {
return s.register_successor(r);
}
template<typename C>
bool remove_successor(sender<C>& s, receiver<C>& r) {
return s.remove_successor(r);
}
//! Pure virtual template class that defines a receiver of messages of type T
template< typename T >
class receiver {
public:
//! Destructor
virtual ~receiver() {}
//! Put an item to the receiver
bool try_put( const T& t ) {
graph_task *res = try_put_task(t);
if (!res) return false;
if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
return true;
}
//! put item to successor; return task to run the successor if possible.
protected:
//! The input type of this receiver
typedef T input_type;
//! The predecessor type for this node
typedef sender<T> predecessor_type;
template< typename R, typename B > friend class run_and_put_task;
template< typename X, typename Y > friend class broadcast_cache;
template< typename X, typename Y > friend class round_robin_cache;
virtual graph_task *try_put_task(const T& t) = 0;
virtual graph& graph_reference() const = 0;
template<typename TT, typename M> friend class successor_cache;
virtual bool is_continue_receiver() { return false; }
// TODO revamp: reconsider the inheritance and move node priority out of receiver
virtual node_priority_t priority() const { return no_priority; }
//! Add a predecessor to the node
virtual bool register_predecessor( predecessor_type & ) { return false; }
//! Remove a predecessor from the node
virtual bool remove_predecessor( predecessor_type & ) { return false; }
template <typename C>
friend bool register_predecessor(receiver<C>& r, sender<C>& s);
template <typename C>
friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
}; // class receiver<T>
template <typename C>
bool register_predecessor(receiver<C>& r, sender<C>& s) {
return r.register_predecessor(s);
}
template <typename C>
bool remove_predecessor(receiver<C>& r, sender<C>& s) {
return r.remove_predecessor(s);
}
//! Base class for receivers of completion messages
/** These receivers automatically reset, but cannot be explicitly waited on */
class continue_receiver : public receiver< continue_msg > {
protected:
//! Constructor
explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
my_current_count = 0;
my_priority = a_priority;
}
//! Copy constructor
continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
my_current_count = 0;
my_priority = src.my_