#pragma once #include "observer.hpp" #include "taskflow.hpp" /** @file executor.hpp @brief executor include file */ namespace tf { // ---------------------------------------------------------------------------- // Executor Definition // ---------------------------------------------------------------------------- /** @class Executor @brief execution interface for running a taskflow graph An executor object manages a set of worker threads to run taskflow(s) using an efficient work-stealing scheduling algorithm. */ class Executor { friend class FlowBuilder; friend class Subflow; friend class cudaFlow; struct PerThread { Worker* worker; PerThread() : worker {nullptr} { } }; public: /** @brief constructs the executor with N worker threads */ explicit Executor(size_t N = std::thread::hardware_concurrency()); /** @brief destructs the executor */ ~Executor(); /** @brief runs the taskflow once @param taskflow a tf::Taskflow object @return a std::future to access the execution status of the taskflow */ std::future run(Taskflow& taskflow); /** @brief runs the taskflow once and invoke a callback upon completion @param taskflow a tf::Taskflow object @param callable a callable object to be invoked after this run @return a std::future to access the execution status of the taskflow */ template std::future run(Taskflow& taskflow, C&& callable); /** @brief runs the taskflow for N times @param taskflow a tf::Taskflow object @param N number of runs @return a std::future to access the execution status of the taskflow */ std::future run_n(Taskflow& taskflow, size_t N); /** @brief runs the taskflow for N times and then invokes a callback @param taskflow a tf::Taskflow @param N number of runs @param callable a callable object to be invoked after this run @return a std::future to access the execution status of the taskflow */ template std::future run_n(Taskflow& taskflow, size_t N, C&& callable); /** @brief runs the taskflow multiple times until the predicate becomes true and then invokes a callback @param taskflow a tf::Taskflow @param pred a boolean predicate to return true for stop @return a std::future to access the execution status of the taskflow */ template std::future run_until(Taskflow& taskflow, P&& pred); /** @brief runs the taskflow multiple times until the predicate becomes true and then invokes the callback @param taskflow a tf::Taskflow @param pred a boolean predicate to return true for stop @param callable a callable object to be invoked after this run @return a std::future to access the execution status of the taskflow */ template std::future run_until(Taskflow& taskflow, P&& pred, C&& callable); /** @brief wait for all pending graphs to complete */ void wait_for_all(); /** @brief queries the number of worker threads (can be zero) */ size_t num_workers() const; /** @brief queries the number of running topologies at the time of this call When a taskflow is submitted to an executor, a topology is created to store runtime metadata of the running taskflow. */ size_t num_topologies() const; /** @brief queries the id of the caller thread in this executor Each worker has an unique id from 0 to N-1 exclusive to the associated executor. If the caller thread does not belong to the executor, -1 is returned. */ int this_worker_id() const; /** @brief runs a given function asynchronously @tparam F callable type @tparam ArgsT parameter types @param f callable object to call @param args parameters to pass to the callable @return a std::future that will eventually hold the result of the function call This method is thread-safe. Multiple threads can launch asynchronous tasks at the same time. */ template auto async(F&& f, ArgsT&&... args); /** @brief similar to tf::Executor::async but does not return a future object */ template void silent_async(F&& f, ArgsT&&... args); /** @brief constructs an observer to inspect the activities of worker threads Each executor manage a list of observers in shared ownership with callers. @tparam Observer observer type derived from tf::ObserverInterface @tparam ArgsT argument parameter pack @param args arguments to forward to the constructor of the observer @return a shared pointer to the created observer */ template std::shared_ptr make_observer(ArgsT&&... args); /** @brief removes the associated observer */ template void remove_observer(std::shared_ptr observer); /** @brief queries the number of observers */ size_t num_observers() const; private: inline static thread_local PerThread _per_thread; const size_t _VICTIM_BEG; const size_t _VICTIM_END; const size_t _MAX_STEALS; const size_t _MAX_YIELDS; std::condition_variable _topology_cv; std::mutex _topology_mutex; std::mutex _wsq_mutex; size_t _num_topologies {0}; std::vector _workers; std::vector _threads; Notifier _notifier; TaskQueue _wsq; std::atomic _num_actives {0}; std::atomic _num_thieves {0}; std::atomic _done {0}; std::unordered_set> _observers; TFProfObserver* _tfprof; bool _wait_for_task(Worker&, Node*&); void _instantiate_tfprof(); void _flush_tfprof(); void _observer_prologue(Worker&, Node*); void _observer_epilogue(Worker&, Node*); void _spawn(size_t); void _worker_loop(Worker&); void _exploit_task(Worker&, Node*&); void _explore_task(Worker&, Node*&); void _schedule(Node*); void _schedule(const std::vector&); void _invoke(Worker&, Node*); void _invoke_static_task(Worker&, Node*); void _invoke_dynamic_task(Worker&, Node*); void _invoke_dynamic_task_internal(Worker&, Node*, Graph&, bool); void _invoke_dynamic_task_external(Node*, Graph&, bool); void _invoke_condition_task(Worker&, Node*, int&); void _invoke_module_task(Worker&, Node*); void _invoke_async_task(Worker&, Node*); void _set_up_topology(Topology*); void _tear_down_topology(Topology*); void _increment_topology(); void _decrement_topology(); void _decrement_topology_and_notify(); void _invoke_cudaflow_task(Worker&, Node*); template , void>* = nullptr > void _invoke_cudaflow_task_entry(C&&, Node*); template , void>* = nullptr > void _invoke_cudaflow_task_entry(C&&, Node*); //template //void _invoke_cudaflow_task_internal(cudaFlow&, P&&, bool); //template //void _invoke_cudaflow_task_external(cudaFlow&, P&&, bool); }; // Constructor inline Executor::Executor(size_t N) : _VICTIM_BEG {0}, _VICTIM_END {N - 1}, _MAX_STEALS {(N + 1) << 1}, _MAX_YIELDS {100}, _workers {N}, _notifier {N} { if(N == 0) { TF_THROW("no cpu workers to execute taskflows"); } _spawn(N); // instantite the default observer if requested _instantiate_tfprof(); } // Destructor inline Executor::~Executor() { // wait for all topologies to complete wait_for_all(); // shut down the scheduler _done = true; _notifier.notify(true); for(auto& t : _threads){ t.join(); } // flush the default observer _flush_tfprof(); } // Procedure: _instantiate_tfprof inline void Executor::_instantiate_tfprof() { // TF_OBSERVER_TYPE _tfprof = get_env("TF_ENABLE_PROFILER").empty() ? nullptr : make_observer().get(); } // Procedure: _flush_tfprof inline void Executor::_flush_tfprof() { if(_tfprof) { std::ostringstream fpath; fpath << get_env("TF_ENABLE_PROFILER") << _tfprof->_UID << ".tfp"; std::ofstream ofs(fpath.str()); _tfprof->dump(ofs); } } // Function: num_workers inline size_t Executor::num_workers() const { return _workers.size(); } // Function: num_topologies inline size_t Executor::num_topologies() const { return _num_topologies; } // Function: async template auto Executor::async(F&& f, ArgsT&&... args) { _increment_topology(); using R = typename function_traits::return_type; std::promise p; auto fu = p.get_future(); auto node = node_pool.animate( std::in_place_type_t{}, [p=make_moc(std::move(p)), f=std::forward(f), args...] () mutable { if constexpr(std::is_same_v) { f(args...); p.object.set_value(); } else { p.object.set_value(f(args...)); } } ); _schedule(node); return fu; } // Function: silent_async template void Executor::silent_async(F&& f, ArgsT&&... args) { _increment_topology(); Node* node = node_pool.animate( std::in_place_type_t{}, [f=std::forward(f), args...] () mutable { f(args...); } ); _schedule(node); } // Function: this_worker_id inline int Executor::this_worker_id() const { auto worker = _per_thread.worker; return worker ? static_cast(worker->id) : -1; } // Procedure: _spawn inline void Executor::_spawn(size_t N) { for(size_t id=0; id void { _per_thread.worker = &w; Node* t = nullptr; // must use 1 as condition instead of !done while(1) { // execute the tasks. _exploit_task(w, t); // wait for tasks if(_wait_for_task(w, t) == false) { break; } } }, std::ref(_workers[id])); } } // Function: _explore_task inline void Executor::_explore_task(Worker& w, Node*& t) { //assert(_workers[w].wsq.empty()); assert(!t); size_t num_steals = 0; size_t num_yields = 0; std::uniform_int_distribution rdvtm(_VICTIM_BEG, _VICTIM_END); //while(!_done) { // // size_t vtm = rdvtm(w.rdgen); // // t = (vtm == w.id) ? _wsq[d].steal() : _workers[vtm].wsq[d].steal(); // if(t) { // break; // } // if(num_steal++ > _MAX_STEALS) { // std::this_thread::yield(); // if(num_yields++ > _MAX_YIELDS) { // break; // } // } //} do { t = (w.id == w.vtm) ? _wsq.steal() : _workers[w.vtm].wsq.steal(); if(t) { break; } if(num_steals++ > _MAX_STEALS) { std::this_thread::yield(); if(num_yields++ > _MAX_YIELDS) { break; } } w.vtm = rdvtm(w.rdgen); } while(!_done); } // Procedure: _exploit_task inline void Executor::_exploit_task(Worker& w, Node*& t) { if(t) { if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) { _notifier.notify(false); } while(t) { _invoke(w, t); t = w.wsq.pop(); } --_num_actives; } } // Function: _wait_for_task inline bool Executor::_wait_for_task(Worker& worker, Node*& t) { wait_for_task: assert(!t); ++_num_thieves; explore_task: _explore_task(worker, t); if(t) { if(_num_thieves.fetch_sub(1) == 1) { _notifier.notify(false); } return true; } _notifier.prepare_wait(worker.waiter); //if(auto vtm = _find_vtm(me); vtm != _workers.size()) { if(!_wsq.empty()) { _notifier.cancel_wait(worker.waiter); //t = (vtm == me) ? _wsq.steal() : _workers[vtm].wsq.steal(); t = _wsq.steal(); // must steal here if(t) { if(_num_thieves.fetch_sub(1) == 1) { _notifier.notify(false); } return true; } else { worker.vtm = worker.id; goto explore_task; } } if(_done) { _notifier.cancel_wait(worker.waiter); _notifier.notify(true); --_num_thieves; return false; } if(_num_thieves.fetch_sub(1) == 1) { if(_num_actives) { _notifier.cancel_wait(worker.waiter); goto wait_for_task; } // check all queues again for(auto& w : _workers) { if(!w.wsq.empty()) { worker.vtm = w.id; _notifier.cancel_wait(worker.waiter); goto wait_for_task; } } } // Now I really need to relinguish my self to others _notifier.commit_wait(worker.waiter); return true; } // Function: make_observer template std::shared_ptr Executor::make_observer(ArgsT&&... args) { static_assert( std::is_base_of_v, "Observer must be derived from ObserverInterface" ); // use a local variable to mimic the constructor auto ptr = std::make_shared(std::forward(args)...); ptr->set_up(_workers.size()); _observers.emplace(std::static_pointer_cast(ptr)); return ptr; } // Procedure: remove_observer template void Executor::remove_observer(std::shared_ptr ptr) { static_assert( std::is_base_of_v, "Observer must be derived from ObserverInterface" ); _observers.erase(std::static_pointer_cast(ptr)); } // Function: num_observers inline size_t Executor::num_observers() const { return _observers.size(); } // Procedure: _schedule // The main procedure to schedule a give task node. // Each task node has two types of tasks - regular and subflow. inline void Executor::_schedule(Node* node) { //assert(_workers.size() != 0); // caller is a worker to this pool auto worker = _per_thread.worker; if(worker != nullptr && worker->executor == this) { worker->wsq.push(node); return; } // other threads { std::lock_guard lock(_wsq_mutex); _wsq.push(node); } _notifier.notify(false); } // Procedure: _schedule // The main procedure to schedule a set of task nodes. // Each task node has two types of tasks - regular and subflow. inline void Executor::_schedule(const std::vector& nodes) { //assert(_workers.size() != 0); // We need to cacth the node count to avoid accessing the nodes // vector while the parent topology is removed! const auto num_nodes = nodes.size(); if(num_nodes == 0) { return; } // worker thread auto worker = _per_thread.worker; if(worker != nullptr && worker->executor == this) { for(size_t i=0; iwsq.push(nodes[i]); } return; } // other threads { std::lock_guard lock(_wsq_mutex); for(size_t k=0; k_semaphores && !node->_semaphores->to_acquire.empty()) { std::vector nodes; if(!node->_acquire_all(nodes)) { _schedule(nodes); return; } } //assert(_workers.size() != 0); // Here we need to fetch the num_successors first to avoid the invalid memory // access caused by topology clear. const auto num_successors = node->num_successors(); // task type auto type = node->_handle.index(); // condition task int cond = -1; // switch is faster than nested if-else due to jump table switch(type) { // static task case Node::STATIC_TASK:{ _invoke_static_task(worker, node); } break; // dynamic task case Node::DYNAMIC_TASK: { _invoke_dynamic_task(worker, node); } break; // condition task case Node::CONDITION_TASK: { _invoke_condition_task(worker, node, cond); } break; //} // no need to add a break here due to the immediate return // module task case Node::MODULE_TASK: { _invoke_module_task(worker, node); } break; // async task case Node::ASYNC_TASK: { _invoke_async_task(worker, node); if(node->_parent) { node->_parent->_join_counter.fetch_sub(1); } else { _decrement_topology_and_notify(); } // recycle the node node_pool.recycle(node); return ; } break; // cudaflow task case Node::CUDAFLOW_TASK: { _invoke_cudaflow_task(worker, node); } break; // monostate default: break; } // if releasing semaphores exist, release them if(node->_semaphores && !node->_semaphores->to_release.empty()) { _schedule(node->_release_all()); } // We MUST recover the dependency since the graph may have cycles. // This must be done before scheduling the successors, otherwise this might cause // race condition on the _dependents if(node->_has_state(Node::BRANCHED)) { node->_join_counter = node->num_strong_dependents(); } else { node->_join_counter = node->num_dependents(); } // acquire the parent flow counter auto& c = (node->_parent) ? node->_parent->_join_counter : node->_topology->_join_counter; // At this point, the node storage might be destructed (to be verified) // case 1: non-condition task if(type != Node::CONDITION_TASK) { for(size_t i=0; i_successors[i]->_join_counter) == 0) { c.fetch_add(1); _schedule(node->_successors[i]); } } } // case 2: condition task else { if(cond >= 0 && static_cast(cond) < num_successors) { auto s = node->_successors[cond]; s->_join_counter.store(0); // seems redundant but just for invariant c.fetch_add(1); _schedule(s); } } // tear down topology if the node is the last one if(node->_parent == nullptr) { if(node->_topology->_join_counter.fetch_sub(1) == 1) { _tear_down_topology(node->_topology); } } else { // joined subflow node->_parent->_join_counter.fetch_sub(1); } } // Procedure: _observer_prologue inline void Executor::_observer_prologue(Worker& worker, Node* node) { for(auto& observer : _observers) { observer->on_entry(WorkerView(worker), TaskView(*node)); } } // Procedure: _observer_epilogue inline void Executor::_observer_epilogue(Worker& worker, Node* node) { for(auto& observer : _observers) { observer->on_exit(WorkerView(worker), TaskView(*node)); } } // Procedure: _invoke_static_task inline void Executor::_invoke_static_task(Worker& worker, Node* node) { _observer_prologue(worker, node); std::get(node->_handle).work(); _observer_epilogue(worker, node); } // Procedure: _invoke_dynamic_task inline void Executor::_invoke_dynamic_task(Worker& w, Node* node) { _observer_prologue(w, node); auto& handle = std::get(node->_handle); handle.subgraph.clear(); Subflow sf(*this, node, handle.subgraph); handle.work(sf); if(sf._joinable) { _invoke_dynamic_task_internal(w, node, handle.subgraph, false); } _observer_epilogue(w, node); } // Procedure: _invoke_dynamic_task_external inline void Executor::_invoke_dynamic_task_external(Node*p, Graph& g, bool detach) { auto worker = _per_thread.worker; assert(worker && worker->executor == this); _invoke_dynamic_task_internal(*worker, p, g, detach); } // Procedure: _invoke_dynamic_task_internal inline void Executor::_invoke_dynamic_task_internal( Worker& w, Node* p, Graph& g, bool detach ) { // graph is empty and has no async tasks if(g.empty() && p->_join_counter == 0) { return; } std::vector src; for(auto n : g._nodes) { n->_topology = p->_topology; n->_set_up_join_counter(); if(detach) { n->_parent = nullptr; n->_set_state(Node::DETACHED); } else { n->_parent = p; } if(n->num_dependents() == 0) { src.push_back(n); } } // detach here if(detach) { { std::lock_guard lock(p->_topology->_taskflow._mtx); p->_topology->_taskflow._graph.merge(std::move(g)); } p->_topology->_join_counter.fetch_add(src.size()); _schedule(src); } // join here else { p->_join_counter.fetch_add(src.size()); _schedule(src); Node* t = nullptr; std::uniform_int_distribution rdvtm(_VICTIM_BEG, _VICTIM_END); while(p->_join_counter != 0) { t = w.wsq.pop(); exploit: if(t) { _invoke(w, t); } else { explore: t = (w.id == w.vtm) ? _wsq.steal() : _workers[w.vtm].wsq.steal(); if(t) { goto exploit; } else if(p->_join_counter != 0){ std::this_thread::yield(); w.vtm = rdvtm(w.rdgen); goto explore; } else { break; } } } } } // Procedure: _invoke_condition_task inline void Executor::_invoke_condition_task( Worker& worker, Node* node, int& cond ) { _observer_prologue(worker, node); cond = std::get(node->_handle).work(); _observer_epilogue(worker, node); } // Procedure: _invoke_cudaflow_task inline void Executor::_invoke_cudaflow_task(Worker& worker, Node* node) { _observer_prologue(worker, node); std::get(node->_handle).work(*this, node); _observer_epilogue(worker, node); } //// Procedure: _invoke_cudaflow_task_internal //template //void Executor::_invoke_cudaflow_task_internal( // cudaFlow& cf, P&& predicate, bool join //) { // // if(cf.empty()) { // return; // } // // // transforms cudaFlow to a native cudaGraph under the specified device // // and launches the graph through a given or an internal device stream // if(cf._executable == nullptr) { // cf._create_executable(); // //cuda_dump_graph(std::cout, cf._graph._native_handle); // } // // cudaScopedPerThreadStream s; // // while(!predicate()) { // // TF_CHECK_CUDA( // cudaGraphLaunch(cf._executable, s), "failed to execute cudaFlow" // ); // // TF_CHECK_CUDA( // cudaStreamSynchronize(s), "failed to synchronize cudaFlow execution" // ); // } // // if(join) { // //cuda_dump_graph(std::cout, cf._graph._native_handle); // cf._destroy_executable(); // } //} //// Procedure: _invoke_cudaflow_task_external //template //void Executor::_invoke_cudaflow_task_external( // cudaFlow& cf, P&& predicate, bool join //) { // // auto w = _per_thread().worker; // // assert(w && w->executor == this); // // _invoke_cudaflow_task_internal(*w, cf, std::forward

(predicate), join); //} // Procedure: _invoke_module_task inline void Executor::_invoke_module_task(Worker& w, Node* node) { _observer_prologue(w, node); auto module = std::get(node->_handle).module; _invoke_dynamic_task_internal(w, node, module->_graph, false); _observer_epilogue(w, node); } // Procedure: _invoke_async_task inline void Executor::_invoke_async_task(Worker& w, Node* node) { _observer_prologue(w, node); std::get(node->_handle).work(); _observer_epilogue(w, node); } // Function: run inline std::future Executor::run(Taskflow& f) { return run_n(f, 1, [](){}); } // Function: run template std::future Executor::run(Taskflow& f, C&& c) { return run_n(f, 1, std::forward(c)); } // Function: run_n inline std::future Executor::run_n(Taskflow& f, size_t repeat) { return run_n(f, repeat, [](){}); } // Function: run_n template std::future Executor::run_n(Taskflow& f, size_t repeat, C&& c) { return run_until( f, [repeat]() mutable { return repeat-- == 0; }, std::forward(c) ); } // Function: run_until template std::future Executor::run_until(Taskflow& f, P&& pred) { return run_until(f, std::forward

(pred), [](){}); } // Function: _set_up_topology inline void Executor::_set_up_topology(Topology* tpg) { tpg->_sources.clear(); tpg->_taskflow._graph.clear_detached(); // scan each node in the graph and build up the links for(auto node : tpg->_taskflow._graph._nodes) { node->_topology = tpg; node->_clear_state(); if(node->num_dependents() == 0) { tpg->_sources.push_back(node); } node->_set_up_join_counter(); } tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed); } // Function: _tear_down_topology inline void Executor::_tear_down_topology(Topology* tpg) { auto &f = tpg->_taskflow; //assert(&tpg == &(f._topologies.front())); // case 1: we still need to run the topology again if(! tpg->_pred() ) { //tpg->_recover_num_sinks(); assert(tpg->_join_counter == 0); tpg->_join_counter = tpg->_sources.size(); _schedule(tpg->_sources); } // case 2: the final run of this topology else { if(tpg->_call != nullptr) { tpg->_call(); } f._mtx.lock(); // If there is another run (interleave between lock) if(f._topologies.size() > 1) { assert(tpg->_join_counter == 0); // Set the promise tpg->_promise.set_value(); f._topologies.pop_front(); f._mtx.unlock(); // decrement the topology but since this is not the last we don't notify _decrement_topology(); tpg = &(f._topologies.front()); _set_up_topology(tpg); _schedule(tpg->_sources); //f._topologies.front()._bind(f._graph); //*tpg = &(f._topologies.front()); //assert(f._topologies.front()._join_counter == 0); //f._topologies.front()._join_counter = f._topologies.front()._sources.size(); //_schedule(f._topologies.front()._sources); } else { assert(f._topologies.size() == 1); // Need to back up the promise first here becuz taskflow might be // destroy before taskflow leaves auto p {std::move(tpg->_promise)}; // Back up lambda capture in case it has the topology pointer, to avoid it releasing on // pop_front ahead of _mtx.unlock & _promise.set_value. Released safely when leaving scope. auto bc{ std::move( tpg->_call ) }; f._topologies.pop_front(); f._mtx.unlock(); // We set the promise in the end in case taskflow leaves before taskflow p.set_value(); _decrement_topology_and_notify(); } } } // Function: run_until template std::future Executor::run_until(Taskflow& f, P&& pred, C&& c) { _increment_topology(); // Special case of predicate if(f.empty() || pred()) { std::promise promise; promise.set_value(); _decrement_topology_and_notify(); return promise.get_future(); } // Multi-threaded execution. bool run_now {false}; Topology* tpg; std::future future; { std::lock_guard lock(f._mtx); // create a topology for this run //tpg = &(f._topologies.emplace_back(f, std::forward

(pred), std::forward(c))); f._topologies.emplace_back(f, std::forward

(pred), std::forward(c)); tpg = &(f._topologies.back()); future = tpg->_promise.get_future(); if(f._topologies.size() == 1) { run_now = true; //tpg->_bind(f._graph); //_schedule(tpg->_sources); } } // Notice here calling schedule may cause the topology to be removed sonner // before the function leaves. if(run_now) { _set_up_topology(tpg); _schedule(tpg->_sources); } return future; } // Procedure: _increment_topology inline void Executor::_increment_topology() { std::lock_guard lock(_topology_mutex); ++_num_topologies; } // Procedure: _decrement_topology_and_notify inline void Executor::_decrement_topology_and_notify() { std::lock_guard lock(_topology_mutex); if(--_num_topologies == 0) { _topology_cv.notify_all(); } } // Procedure: _decrement_topology inline void Executor::_decrement_topology() { std::lock_guard lock(_topology_mutex); --_num_topologies; } // Procedure: wait_for_all inline void Executor::wait_for_all() { std::unique_lock lock(_topology_mutex); _topology_cv.wait(lock, [&](){ return _num_topologies == 0; }); } // ############################################################################ // Forward Declaration: Subflow // ############################################################################ inline void Subflow::join() { if(!_joinable) { TF_THROW("subflow not joinable"); } _executor._invoke_dynamic_task_external(_parent, _graph, false); _joinable = false; } inline void Subflow::detach() { if(!_joinable) { TF_THROW("subflow already joined or detached"); } _executor._invoke_dynamic_task_external(_parent, _graph, true); _joinable = false; } // Function: async template auto Subflow::async(F&& f, ArgsT&&... args) { _parent->_join_counter.fetch_add(1); using R = typename function_traits::return_type; std::promise p; auto fu = p.get_future(); auto node = node_pool.animate( std::in_place_type_t{}, [p=make_moc(std::move(p)), f=std::forward(f), args...] () mutable { if constexpr(std::is_same_v) { f(args...); p.object.set_value(); } else { p.object.set_value(f(args...)); } } ); node->_topology = _parent->_topology; node->_parent = _parent; _executor._schedule(node); return fu; } // Function: silent_async template void Subflow::silent_async(F&& f, ArgsT&&... args) { _parent->_join_counter.fetch_add(1); auto node = node_pool.animate( std::in_place_type_t{}, [f=std::forward(f), args...] () mutable { f(args...); } ); node->_topology = _parent->_topology; node->_parent = _parent; _executor._schedule(node); } } // end of namespace tf -----------------------------------------------------