diff --git a/include/xo/reactor/AbstractSource.hpp b/include/xo/reactor/AbstractSource.hpp index ed31d04f..4d995ab2 100644 --- a/include/xo/reactor/AbstractSource.hpp +++ b/include/xo/reactor/AbstractSource.hpp @@ -38,10 +38,16 @@ namespace xo { virtual TypeDescr source_ev_type() const = 0; /* if true: event objects (see .source_ev_type()) - * maybe overwritten between callbacks. + * may be overwritten between callbacks. * A sink that wants to capture events * (e.g. EventStore<>) will need to deep-copy them * if false: event objects are preserved between callbacks. + * + * A source that stores events received from elsewhere (e.g. FifoQueue) + * is probably volatile. + * + * A source that remembers (in explicit memory) every event it produces + * is not volatile */ virtual bool is_volatile() const = 0; diff --git a/include/xo/reactor/DirectSource.hpp b/include/xo/reactor/DirectSource.hpp index 03710281..7187e56d 100644 --- a/include/xo/reactor/DirectSource.hpp +++ b/include/xo/reactor/DirectSource.hpp @@ -2,7 +2,7 @@ #pragma once -#include "time/Time.hpp" +//#include "time/Time.hpp" #include "reactor/Sink.hpp" #include "reactor/EventSource.hpp" #include "reactor/HeapReducer.hpp" @@ -16,4 +16,3 @@ namespace xo { } /*namespace xo*/ /* end DirectSource.hpp */ - diff --git a/include/xo/reactor/EventSource.hpp b/include/xo/reactor/EventSource.hpp index a60297f7..055c7d6b 100644 --- a/include/xo/reactor/EventSource.hpp +++ b/include/xo/reactor/EventSource.hpp @@ -2,24 +2,24 @@ #pragma once -#include "reactor/ReactorSource.hpp" -#include "callback/CallbackSet.hpp" +#include "ReactorSource.hpp" +#include "xo/callback/CallbackSet.hpp" namespace xo { - namespace reactor { - template - class EventSource : public ReactorSource { - public: - using CallbackId = fn::CallbackId; + namespace reactor { + template + class EventSource : public ReactorSource { + public: + using CallbackId = fn::CallbackId; - public: - virtual CallbackId add_callback(ref::rp const & cb) = 0; - virtual void remove_callback(CallbackId id) = 0; - }; /*EventSource*/ - - } /*namespace reactor*/ + public: + virtual CallbackId add_callback(ref::rp const & cb) = 0; + virtual void remove_callback(CallbackId id) = 0; + }; /*EventSource*/ + + } /*namespace reactor*/ } /*namespace xo*/ /* end EventSource.hpp */ diff --git a/include/xo/reactor/EventTimeFn.hpp b/include/xo/reactor/EventTimeFn.hpp index 28c36d0b..ee206d11 100644 --- a/include/xo/reactor/EventTimeFn.hpp +++ b/include/xo/reactor/EventTimeFn.hpp @@ -2,7 +2,8 @@ #pragma once -#include "time/Time.hpp" +//#include "time/Time.hpp" +#include "xo/indentlog/timeutil/timeutil.hpp" #include namespace xo { @@ -17,7 +18,7 @@ namespace xo { public: using event_t = Event; using utc_nanos = xo::time::utc_nanos; - + public: utc_nanos operator()(Event const & ev) const { return ev.tm(); } }; /*StructEventTimeFn*/ @@ -32,6 +33,15 @@ namespace xo { utc_nanos operator()(Event const & ev) const { return ev->tm(); } }; /*PtrEventTimeFn*/ + template + class PairEventTimeFn { + public: + using utc_nanos = xo::time::utc_nanos; + using event_t = std::pair; + + public: + utc_nanos operator()(event_t const & ev) const { return ev.first; } + }; /*PairEventTimeFn*/ } /*namespace reactor*/ } /*namespace xo*/ diff --git a/include/xo/reactor/HeapReducer.hpp b/include/xo/reactor/HeapReducer.hpp index 5355e056..7a7456ae 100644 --- a/include/xo/reactor/HeapReducer.hpp +++ b/include/xo/reactor/HeapReducer.hpp @@ -1,72 +1,72 @@ /* @file HeapReducer.hpp */ -#pragma once +#pragma once -#include "reactor/Reducer.hpp" +#include "Reducer.hpp" namespace xo { - namespace reactor { - /* collect incoming events in a heap, - * ordered by timestamp. - * output events in increasing timestamp order. - * Information preserving in all other respects - * - * Require: - * - Event is null-constructible - * - Event is copyable - * - EventTimeFn :: Event -> utc_nanos - */ - template> - class HeapReducer : public ReducerBase { - public: - using utc_nanos = xo::time::utc_nanos; - public: - HeapReducer() = default; - HeapReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} + namespace reactor { + /* collect incoming events in a heap, + * ordered by timestamp. + * output events in increasing timestamp order. + * Information preserving in all other respects + * + * Require: + * - Event is null-constructible + * - Event is copyable + * - EventTimeFn :: Event -> utc_nanos + */ + template> + class HeapReducer : public ReducerBase { + public: + using utc_nanos = xo::time::utc_nanos; + public: + HeapReducer() = default; + HeapReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} - bool is_empty() const { return this->event_heap_.empty(); } - /* require: .is_empty() = false */ - utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); } - /* #of events stored in this reducer */ - uint32_t n_event() const { return this->event_heap_.size(); } + bool is_empty() const { return this->event_heap_.empty(); } + /* require: .is_empty() = false */ + utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); } + /* #of events stored in this reducer */ + uint32_t n_event() const { return this->event_heap_.size(); } - Event const & last_annexed_ev() const { return this->annexed_ev_; } + Event const & last_annexed_ev() const { return this->annexed_ev_; } - void include_event(Event const & ev) { - this->event_heap_.push_back(ev); - std::push_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - } /*include_event*/ + void include_event(Event const & ev) { + this->event_heap_.push_back(ev); + std::push_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + } /*include_event*/ - void include_event(Event && ev) { - this->event_heap_.push_back(std::move(ev)); - std::push_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - } /*include_event*/ + void include_event(Event && ev) { + this->event_heap_.push_back(std::move(ev)); + std::push_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + } /*include_event*/ - Event & annex_one() { - this->annexed_ev_ = this->event_heap_.front(); - std::pop_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - this->event_heap_.pop_back(); + Event & annex_one() { + this->annexed_ev_ = this->event_heap_.front(); + std::pop_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + this->event_heap_.pop_back(); - return this->annexed_ev_; - } /*annex_one*/ + return this->annexed_ev_; + } /*annex_one*/ - // ----- Inherited from ReducerBase ----- + // ----- Inherited from ReducerBase ----- - // utc_nanos event_tm(Event const & x); - - private: - /* queued Events, in increasing timestamp order */ - std::vector event_heap_; - /* annexed event, removed from .event_heap */ - Event annexed_ev_; - }; /*HeapReducer*/ - } /*namespace reactor*/ + // utc_nanos event_tm(Event const & x); + + private: + /* queued Events, in increasing timestamp order */ + std::vector event_heap_; + /* annexed event, removed from .event_heap */ + Event annexed_ev_; + }; /*HeapReducer*/ + } /*namespace reactor*/ } /*namespace xo*/ /* end HeapReducer.hpp */ diff --git a/include/xo/reactor/PollingReactor.hpp b/include/xo/reactor/PollingReactor.hpp index 7dbad4ab..8cbe2c2f 100644 --- a/include/xo/reactor/PollingReactor.hpp +++ b/include/xo/reactor/PollingReactor.hpp @@ -8,37 +8,41 @@ #include namespace xo { - namespace reactor { - /* reactor that runs by polling an ordered set of sources */ - class PollingReactor : public Reactor { - public: - PollingReactor() = default; + namespace reactor { + /* reactor that runs by polling an ordered set of sources */ + class PollingReactor : public Reactor { + public: + /* named ctor idiom */ + static ref::rp make() { return new PollingReactor(); } - // ----- inherited from Reactor ----- + // ----- inherited from Reactor ----- - virtual bool add_source(ref::brw src) override; - virtual bool remove_source(ref::brw src) override; - virtual std::uint64_t run_one() override; + virtual bool add_source(ref::brw src) override; + virtual bool remove_source(ref::brw src) override; + virtual void notify_source_primed(ref::brw src) override; + virtual std::uint64_t run_one() override; - private: - /* find non-empty source, starting from .source_v_[start_ix], - * wrapping around to .source_v_[start_ix - 1]. - * - * return index of first available non-empty source, - * or -1 if all sources are empty - */ - std::int64_t find_nonempty_source(std::size_t start_ix); + private: + PollingReactor() = default; - private: - /* next source to poll will be .source_v_[.next_ix_] */ - std::size_t next_ix_ = 0; + /* find non-empty source, starting from .source_v_[start_ix], + * wrapping around to .source_v_[start_ix - 1]. + * + * return index of first available non-empty source, + * or -1 if all sources are empty + */ + std::int64_t find_nonempty_source(std::size_t start_ix); - /* ordered set of sources (see reactor::Source) - * reactor will poll sources in round-robin order - */ - std::vector source_v_; - }; /*PollingReactor*/ - } /*namespace reactor*/ + private: + /* next source to poll will be .source_v_[.next_ix_] */ + std::size_t next_ix_ = 0; + + /* ordered set of sources (see reactor::Source) + * reactor will poll sources in round-robin order + */ + std::vector source_v_; + }; /*PollingReactor*/ + } /*namespace reactor*/ } /*namespace xo*/ /* end PollingReactor.hpp */ diff --git a/include/xo/reactor/PolyAdapterSink.hpp b/include/xo/reactor/PolyAdapterSink.hpp index e8f68eb9..60ec4ccf 100644 --- a/include/xo/reactor/PolyAdapterSink.hpp +++ b/include/xo/reactor/PolyAdapterSink.hpp @@ -9,84 +9,84 @@ #include "xo/reflect/Reflect.hpp" namespace xo { - namespace reactor { - /* adapter between a source that delivers a particular event type T, - * and a sink that accepts arbitrarily-typed events via .notify_ev_tp() - * Use this to connect to a polymorphic sink. - * - * Require: - * - .poly_sink.allow_polymorphic_source() - * (ofc. otherwise no point in using PolyAdapterSink) - * - .poly_sink.allow_volatile_source() - * need this bc will be wrapping event with TaggedPtr, - * which doesn't manage event lifetime - */ - template - class PolyAdapterSink : public reactor::Sink1 { - public: - using Reflect = reflect::Reflect; - using TaggedPtr = reflect::TaggedPtr; + namespace reactor { + /* adapter between a source that delivers a particular event type T, + * and a sink that accepts arbitrarily-typed events via .notify_ev_tp() + * Use this to connect to a polymorphic sink. + * + * Require: + * - .poly_sink.allow_polymorphic_source() + * (ofc. otherwise no point in using PolyAdapterSink) + * - .poly_sink.allow_volatile_source() + * need this bc will be wrapping event with TaggedPtr, + * which doesn't manage event lifetime + */ + template + class PolyAdapterSink : public reactor::Sink1 { + public: + using Reflect = reflect::Reflect; + using TaggedPtr = reflect::TaggedPtr; - public: - /* named ctor idiom */ - static ref::rp make(ref::rp poly_sink) { - //xo::scope lscope("PolyAdapterSink::make"); + public: + /* named ctor idiom */ + static ref::rp make(ref::rp poly_sink) { + //xo::scope lscope("PolyAdapterSink::make"); - ref::rp retval(new PolyAdapterSink(poly_sink)); + ref::rp retval(new PolyAdapterSink(poly_sink)); - //lscope.log("adapter", (void*)retval.get()); + //lscope.log("adapter", (void*)retval.get()); - return retval; - } /*make*/ + return retval; + } /*make*/ - // ----- Inherited from Sink1 ----- + // ----- Inherited from Sink1 ----- - virtual void notify_ev(T const & ev) override { - //xo::scope lscope("PolyAdapterSink::notify_ev"); - //lscope.log(xo::xtag("ev", ev)); + virtual void notify_ev(T const & ev) override { + //xo::scope lscope("PolyAdapterSink::notify_ev"); + //lscope.log(xo::xtag("ev", ev)); - TaggedPtr ev_tp = Reflect::make_tp(const_cast(&ev)); + TaggedPtr ev_tp = Reflect::make_tp(const_cast(&ev)); - this->notify_ev_tp(ev_tp); - } /*notify_ev*/ + this->notify_ev_tp(ev_tp); + } /*notify_ev*/ - // ----- Inherited from AbstractSink ----- + // ----- Inherited from AbstractSink ----- - virtual bool allow_volatile_source() const override { return true; } - virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); } - /* note: ok to do this, however if expecting to use this entry point, - * maybe don't need to interpose PolyAdapterSink ahead of .poly_sink - */ - virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { - //xo::scope lscope("PolyAdapterSink::notify_ev_tp"); + virtual bool allow_volatile_source() const override { return true; } + virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); } + /* note: ok to do this, however if expecting to use this entry point, + * maybe don't need to interpose PolyAdapterSink ahead of .poly_sink + */ + virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { + //xo::scope lscope("PolyAdapterSink::notify_ev_tp"); - return this->poly_sink_->notify_ev_tp(ev_tp); - } + return this->poly_sink_->notify_ev_tp(ev_tp); + } - // ----- Inherited from AbstractEventProcessor ----- + // ----- Inherited from AbstractEventProcessor ----- - virtual std::string const & name() const override { return this->poly_sink_->name(); } - virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); } - virtual void visit_direct_consumers(std::function ep)> const & fn) override { - this->poly_sink_->visit_direct_consumers(fn); - } - virtual void display(std::ostream & os) const override { - using xo::xtag; - os << "()) - << xtag("poly", this->poly_sink_) - << ">"; - } /*display*/ + virtual std::string const & name() const override { return this->poly_sink_->name(); } + virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); } + virtual void visit_direct_consumers(std::function ep)> const & fn) override { + this->poly_sink_->visit_direct_consumers(fn); + } + virtual void display(std::ostream & os) const override { + using xo::xtag; + os << "()) + << xtag("poly", this->poly_sink_) + << ">"; + } /*display*/ - private: - PolyAdapterSink(ref::rp poly_sink) : poly_sink_{std::move(poly_sink)} {} + private: + PolyAdapterSink(ref::rp poly_sink) : poly_sink_{std::move(poly_sink)} {} - private: - /* mandate: .poly_sink.allow_polymorphic_source() is true */ - ref::rp poly_sink_; - }; /*PolyAdapterSink*/ - } /*namespace reactor*/ + private: + /* mandate: .poly_sink.allow_polymorphic_source() is true */ + ref::rp poly_sink_; + }; /*PolyAdapterSink*/ + } /*namespace reactor*/ } /*namespace xo*/ /* end PolyAdapterSink.hpp */ diff --git a/include/xo/reactor/Reactor.hpp b/include/xo/reactor/Reactor.hpp index c56a590c..453312a4 100644 --- a/include/xo/reactor/Reactor.hpp +++ b/include/xo/reactor/Reactor.hpp @@ -3,61 +3,69 @@ #pragma once #include "xo/refcnt/Refcounted.hpp" +#include "xo/indentlog/log_level.hpp" #include namespace xo { - namespace reactor { - class ReactorSource; + namespace reactor { + class ReactorSource; - /* abtract api for a reactor: - * something that arranges to have work done on a set of Sources. - */ - class Reactor : public ref::Refcount { - public: - virtual ~Reactor() = default; + /* abtract api for a reactor: + * something that arranges to have work done on a set of Sources. + */ + class Reactor : public ref::Refcount { + public: + virtual ~Reactor() = default; - /* add source src to this reactor. - * on success, invoke src.notify_reactor_add(this) - * - * returns true if source added; false if already present - */ - virtual bool add_source(ref::brw src) = 0; + log_level loglevel() const { return loglevel_; } + void set_loglevel(log_level loglevel) { loglevel_ = loglevel; } - /* remove source src from this reactor. - * source must previously have been added by - * .add_source(src). - * - * on success, invoke src.notify_reactor_remove(this) - * - * returns true if source removed; false if not present - */ - virtual bool remove_source(ref::brw src) = 0; + /* add source src to this reactor. + * on success, invoke src.notify_reactor_add(this) + * + * returns true if source added; false if already present + */ + virtual bool add_source(ref::brw src) = 0; - /* notification when non-primed source (source with no known events) - * becomes primed (source with at least one event) - */ - virtual void notify_source_primed(ref::brw src) = 0; + /* remove source src from this reactor. + * source must previously have been added by + * .add_source(src). + * + * on success, invoke src.notify_reactor_remove(this) + * + * returns true if source removed; false if not present + */ + virtual bool remove_source(ref::brw src) = 0; - /* dispatch one reactor event, borrowing the calling thread - * amount of work this represents is Source/Sink specific. - * - * returns #of events dispatched (0 or 1) - */ - virtual std::uint64_t run_one() = 0; + /* notification when non-primed source (source with no known events) + * becomes primed (source with at least one event) + */ + virtual void notify_source_primed(ref::brw src) = 0; - /* borrow calling thread to dispatch reactor events. - * if n is -1, run indefinitely - * otherwise dispatch up to n events. - * n = 0 is a noop - */ - void run_n(int32_t n); + /* dispatch one reactor event, borrowing the calling thread + * amount of work this represents is Source/Sink specific. + * + * returns #of events dispatched (0 or 1) + */ + virtual std::uint64_t run_one() = 0; - /* borrow calling thread to run indefinitely. - * suitable implementation for dedicated reactor threads - */ - void run() { this->run_n(-1); } - }; /*Reactor*/ - } /*namespace reactor*/ + /* borrow calling thread to dispatch reactor events. + * if n is -1, run indefinitely + * otherwise dispatch up to n events. + * n = 0 is a noop + */ + void run_n(int32_t n); + + /* borrow calling thread to run indefinitely. + * suitable implementation for dedicated reactor threads + */ + void run() { this->run_n(-1); } + + private: + /* control logging verbosity */ + log_level loglevel_; + }; /*Reactor*/ + } /*namespace reactor*/ } /*namespace xo*/ /* end Reactor.hpp */ diff --git a/include/xo/reactor/ReactorSource.hpp b/include/xo/reactor/ReactorSource.hpp index 92f275d4..8b217a4b 100644 --- a/include/xo/reactor/ReactorSource.hpp +++ b/include/xo/reactor/ReactorSource.hpp @@ -7,124 +7,124 @@ #include namespace xo { - namespace reactor { - class Reactor; + namespace reactor { + class Reactor; - /* abstract api for a source of events. - * Event representation is left open: Sources and Sinks - * need to have compatible event representations, - * and coordination is left to such (Source, Sink) pairs. - * - * Source->Sink activity may be expected to be mediated by a reactor, - * that implements the Reactor api. - * - * At any time, A Source can be associated with at most one reactor. - * Sources are informed of Reactor<->Source association being - * formed/broken by the - * .notify_reactor_add(), .notify_reactor_remove() - * methods - * - * The source api intends also to provide for simulation. - * There introduces two simulation-specific methods: - * .sim_current_tm() - * .sim_advance_until() - * - * A non-simulation source can implement these as calls to - * .online_current_tm(), .online_advance_until() respectively - * .online_current_tm() aborts since an online source is never exhausted - * .online_advance_until() is a no-op that returns 0 - * - * Loop for consuming from a primary simulation source: - * - * brw s = ...; - * while(!s->is_exhausted()) - * s->deliver_one(); - * - * Secondary sources (sources that depend on other sources) can be - * in a state where they don't know their next event, in which case: - * - * s->is_notprimed() == true - */ - class ReactorSource : public AbstractSource { - public: - using utc_nanos = xo::time::utc_nanos; + /* abstract api for a source of events. + * Event representation is left open: Sources and Sinks + * need to have compatible event representations, + * and coordination is left to such (Source, Sink) pairs. + * + * Source->Sink activity may be expected to be mediated by a reactor, + * that implements the Reactor api. + * + * At any time, A Source can be associated with at most one reactor. + * Sources are informed of Reactor<->Source association being + * formed/broken by the + * .notify_reactor_add(), .notify_reactor_remove() + * methods + * + * The source api intends also to provide for simulation. + * There introduces two simulation-specific methods: + * .sim_current_tm() + * .sim_advance_until() + * + * A non-simulation source can implement these as calls to + * .online_current_tm(), .online_advance_until() respectively + * .online_current_tm() aborts since an online source is never exhausted + * .online_advance_until() is a no-op that returns 0 + * + * Loop for consuming from a primary simulation source: + * + * brw s = ...; + * while(!s->is_exhausted()) + * s->deliver_one(); + * + * Secondary sources (sources that depend on other sources) can be + * in a state where they don't know their next event, in which case: + * + * s->is_notprimed() == true + */ + class ReactorSource : public AbstractSource { + public: + using utc_nanos = xo::time::utc_nanos; - public: - virtual ~ReactorSource() = default; + public: + virtual ~ReactorSource() = default; - /* true if source is currently empty (has 0 events to deliver) */ - virtual bool is_empty() const = 0; - bool is_nonempty() const { return !this->is_empty(); } + /* true if source is currently empty (has 0 events to deliver) */ + virtual bool is_empty() const = 0; + bool is_nonempty() const { return !this->is_empty(); } - /* true when source knows its next event - * A source that isn't primed is also excluded from simulation - * heap until it becomes primed. - * This make feasible simulation sources that - * depend on other simulation sources - */ - virtual bool is_primed() const { return !this->is_empty(); } - virtual bool is_notprimed() const { return this->is_empty(); } + /* true when source knows its next event + * A source that isn't primed is also excluded from simulation + * heap until it becomes primed. + * This make feasible simulation sources that + * depend on other simulation sources + */ + virtual bool is_primed() const { return !this->is_empty(); } + virtual bool is_notprimed() const { return this->is_empty(); } - /* if true, this source has no events, and will never publish more events - * - for sim, return true for a standalone source that has replayed all events - * - for rt, set during orderly - */ - virtual bool is_exhausted() const = 0; + /* if true, this source has no events, and will never publish more events + * - for sim, return true for a standalone source that has replayed all events + * - for rt, set during orderly + */ + virtual bool is_exhausted() const = 0; - /* if this is a simulation source and .is_exhausted is false: - * returns next event time; more precisely, no events exist prior to - * this time. - * - * if sim, and .is_primed = true, - * returns timestamp of next event - */ - virtual utc_nanos sim_current_tm() const = 0; + /* if this is a simulation source and .is_exhausted is false: + * returns next event time; more precisely, no events exist prior to + * this time. + * + * if sim, and .is_primed = true, + * returns timestamp of next event + */ + virtual utc_nanos sim_current_tm() const = 0; - /* promise: - * - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true - * - if replay_flag is true, then any events between previous .current_tm() - * and new .current_tm() will have been published - * - * returns #of events delivered. - * does not count events that were skipped, so always returns 0 if - * replay_flag is false - */ - virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0; + /* promise: + * - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true + * - if replay_flag is true, then any events between previous .current_tm() + * and new .current_tm() will have been published + * + * returns #of events delivered. + * does not count events that were skipped, so always returns 0 if + * replay_flag is false + */ + virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0; - /* informs source when it's added to a reactor + /* informs source when it's added to a reactor - * (see Reactor.add_source()) - */ - virtual void notify_reactor_add(Reactor * /*reactor*/) {} + * (see Reactor.add_source()) + */ + virtual void notify_reactor_add(Reactor * /*reactor*/) {} - /* informs source when it's removed from a reactor - * (see Reactor.remove_source()) - */ - virtual void notify_reactor_remove(Reactor * /*reactor*/) {} + /* informs source when it's removed from a reactor + * (see Reactor.remove_source()) + */ + virtual void notify_reactor_remove(Reactor * /*reactor*/) {} - // ----- Inherited from AbstractSource ----- + // ----- Inherited from AbstractSource ----- - /* deliver one event to attached sink - * interpretation of 'one event' is source-specific; - * could be a collapsed or batched event in practice. - * - * no-op if source is empty. - * - * if sim, promise: - * - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted() - * - * returns #of events delivered. Must be 0 or 1 in this context - */ - virtual std::uint64_t deliver_one() override = 0; + /* deliver one event to attached sink + * interpretation of 'one event' is source-specific; + * could be a collapsed or batched event in practice. + * + * no-op if source is empty. + * + * if sim, promise: + * - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted() + * + * returns #of events delivered. Must be 0 or 1 in this context + */ + virtual std::uint64_t deliver_one() override = 0; - protected: - /* default implementations for online sources */ - utc_nanos online_current_tm() const; - uint64_t online_advance_until(utc_nanos tm, bool replay_flag); - }; /*ReactorSource*/ + protected: + /* default implementations for online sources */ + utc_nanos online_current_tm() const; + uint64_t online_advance_until(utc_nanos tm, bool replay_flag); + }; /*ReactorSource*/ - using ReactorSourcePtr = ref::rp; - } /*namespace reactor*/ + using ReactorSourcePtr = ref::rp; + } /*namespace reactor*/ } /*namespace xo*/ /* end ReactorSource.hpp */ diff --git a/include/xo/reactor/SecondarySource.hpp b/include/xo/reactor/SecondarySource.hpp index 398a17bb..c58c2e92 100644 --- a/include/xo/reactor/SecondarySource.hpp +++ b/include/xo/reactor/SecondarySource.hpp @@ -2,358 +2,359 @@ #pragma once -#include "time/Time.hpp" -#include "reactor/Sink.hpp" -#include "reactor/DirectSource.hpp" -#include "reactor/Reactor.hpp" -#include "callback/CallbackSet.hpp" -#include "reflect/demangle.hpp" +//#include "time/Time.hpp" +#include "Sink.hpp" +//#include "xo/reactor/DirectSource.hpp" +#include "Reactor.hpp" +#include "HeapReducer.hpp" +#include "xo/callback/CallbackSet.hpp" +#include "xo/cxxutil/demangle.hpp" #include namespace xo { - namespace reactor { - /* A passive event source. - * Can use as backend publisher when implementating another - * event processor. - */ - template> - class SecondarySource : public EventSource> { - public: - using EventSink = Sink1; - template - using RpCallbackSet = fn::RpCallbackSet; - using CallbackId = fn::CallbackId; - using TypeDescr = xo::reflect::TypeDescr; - using utc_nanos = xo::time::utc_nanos; + namespace reactor { + /* A passive event source. + * Can use as backend publisher when implementating another + * event processor. + */ + template> + class SecondarySource : public EventSource> { + public: + using EventSink = Sink1; + template + using RpCallbackSet = fn::RpCallbackSet; + using CallbackId = fn::CallbackId; + using TypeDescr = xo::reflect::TypeDescr; + using utc_nanos = xo::time::utc_nanos; - public: - ~SecondarySource() = default; + public: + ~SecondarySource() = default; - static ref::rp make() { return new SecondarySource(); } + static ref::rp make() { return new SecondarySource(); } - /* last event delivered from this source -- - * i.e. event in most recent call to .deliver_one_aux() - */ - Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); } + /* last event delivered from this source -- + * i.e. event in most recent call to .deliver_one_aux() + */ + Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); } - void notify_upstream_exhausted() { this->upstream_exhausted_ = true; } + void notify_upstream_exhausted() { this->upstream_exhausted_ = true; } - /* make event available to reactor, by adding to internal reducer */ - void notify_secondary_event(Event const & ev) { - /* test if ev is priming, update .current_tm */ - bool is_priming = this->preprocess_secondary_event(ev); + /* make event available to reactor, by adding to internal reducer */ + void notify_secondary_event(Event const & ev) { + /* test if ev is priming, update .current_tm */ + bool is_priming = this->preprocess_secondary_event(ev); - this->reducer_.include_event(ev); + this->reducer_.include_event(ev); - this->postprocess_secondary_event(is_priming); - } /*notify_secondary_event*/ + this->postprocess_secondary_event(is_priming); + } /*notify_secondary_event*/ - void notify_secondary_event(Event && ev) { - bool is_priming = this->preprocess_secondary_event(ev); + void notify_secondary_event(Event && ev) { + bool is_priming = this->preprocess_secondary_event(ev); - this->reducer_.include_event(ev); + this->reducer_.include_event(ev); - this->postprocess_secondary_event(is_priming); - } /*notify_secondary_event*/ + this->postprocess_secondary_event(is_priming); + } /*notify_secondary_event*/ - template - void notify_secondary_event_v(T const & v) { - using xo::scope; - using xo::xtag; + template + void notify_secondary_event_v(T const & v) { + using xo::scope; + using xo::xtag; - if (v.empty()) - return; + if (v.empty()) + return; - scope log(XO_DEBUG(this->debug_sim_flag_)); + scope log(XO_DEBUG(this->debug_sim_flag_)); - log && log(xtag("name", this->name())); + log && log(xtag("name", this->name())); - if (this->upstream_exhausted_) { - throw std::runtime_error("SecondarySource::notify_secondary_event_v" - ": not allowed after upstream exhausted"); + if (this->upstream_exhausted_) { + throw std::runtime_error("SecondarySource::notify_secondary_event_v" + ": not allowed after upstream exhausted"); + } + + uint32_t n_ev = 0; + + for (Event const & ev : v) { + utc_nanos evtm = this->reducer_.event_tm(ev); + + if (this->current_tm_ < evtm) + this->current_tm_ = evtm; + + ++n_ev; + } + + log && log(xtag("T", reflect::type_name()), + xtag("n_ev", n_ev)); + + if (n_ev > 0) { + /* if reducer is empty when .notify_secondary_event_v() begins, + * then reactor/simulator needs to be notified that source is no longer empty + */ + bool is_priming = this->reducer_.is_empty(); + + for (Event const & ev : v) + this->reducer_.include_event(ev); + + Reactor * reactor = this->parent_reactor_; + + if (reactor) { + if (is_priming) { + /* reactor/simulator takes responsibility for delivering events */ + reactor->notify_source_primed(ref::brw::from_native(this)); + } + } else { + /* special case if no reactor: deliver immediately */ + + //this->deliver_one(); + this->deliver_all(); + } + } + } /*notify_secondary_event_v*/ + + // ----- inherited from EventSource ----- + + virtual CallbackId add_callback(ref::rp const & cb) override { + return this->cb_set_.add_callback(cb); + } /*add_callback*/ + + virtual void remove_callback(CallbackId id) override { + this->cb_set_.remove_callback(id); + } /*remove_callback*/ + + // ----- inherited from ReactorSource ----- + + virtual bool is_empty() const override { return this->reducer_.is_empty(); } + virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); } + + virtual utc_nanos sim_current_tm() const override { + using xo::scope; + using xo::xtag; + + if (this->reducer_.is_empty()) { + /* this is a tricky case. + * it means this source doesn't + * _know_ specific next event yet; however new events + * may appear at any time by way of .notify_event() + * + * If event doesn't know next event, then .current_tm isn't useful + * for establishing priority relative to other sources. + * rely on priming mechanism instead, + * which means that control should never come here. + */ + return this->current_tm_; + } else { + scope log(XO_DEBUG(false /*this->debug_sim_flag_*/), + xtag("name", this->name_), + xtag("next_tm", this->reducer_.next_tm())); + + return this->reducer_.next_tm(); + } + } /*sim_current_tm*/ + + virtual std::uint64_t deliver_one() override { + return this->deliver_one_aux(true /*replay_flag*/); } - uint32_t n_ev = 0; + virtual std::uint64_t sim_advance_until(utc_nanos target_tm, + bool replay_flag) override + { + uint64_t retval = 0; - for (Event const & ev : v) { - utc_nanos evtm = this->reducer_.event_tm(ev); + while (!this->reducer_.is_empty()) { + utc_nanos tm = this->sim_current_tm(); - if (this->current_tm_ < evtm) - this->current_tm_ = evtm; + if (tm < target_tm) { + retval += this->deliver_one_aux(replay_flag); + } else { + break; + } + } - ++n_ev; - } + return retval; + } /*sim_advance_until*/ - log && log(xtag("T", reflect::type_name()), - xtag("n_ev", n_ev)); + virtual void notify_reactor_add(Reactor * reactor) override { + assert(!this->parent_reactor_); - if (n_ev > 0) { - /* if reducer is empty when .notify_secondary_event_v() begins, - * then reactor/simulator needs to be notified that source is no longer empty - */ - bool is_priming = this->reducer_.is_empty(); + this->parent_reactor_ = reactor; + } /*notify_reactor_add*/ - for (Event const & ev : v) - this->reducer_.include_event(ev); + virtual void notify_reactor_remove(Reactor * /*reactor*/) override {} + + // ----- inherited from AbstractSource ----- + + virtual TypeDescr source_ev_type() const override { + return reflect::Reflect::require(); + } /*source_ev_type*/ + + virtual uint32_t n_out_ev() const override { return n_out_ev_; } + /* #of events queued for delivery */ + virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); } + + virtual bool debug_sim_flag() const override { return debug_sim_flag_; } + virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; } + + virtual CallbackId attach_sink(ref::rp const & sink) override { + ref::rp native_sink + = EventSink::require_native("SecondarySource::attach_sink", sink); + + if (native_sink) { + if (!this->is_volatile() + || native_sink->allow_volatile_source()) + { + return this->add_callback(native_sink); + } else { + throw std::runtime_error("SecondarySource::attach_sink" + ": sink requires non-volatile source " + + std::string(reflect::type_name())); + } + } else { + throw std::runtime_error("SecondarySource::attach_sink" + ": expected sink accepting " + + std::string(reflect::type_name())); + } + } /*attach_sink*/ + + virtual void detach_sink(CallbackId id) override { + this->remove_callback(id); + } /*detach_sink*/ + + // ----- Inherited from AbstractEventProcessor ----- + + virtual std::string const & name() const override { return name_; } + virtual void set_name(std::string const & x) override { this->name_ = x; } + + virtual void visit_direct_consumers(std::function ep)> const & fn) override { + + for(auto x : this->cb_set_) + fn(x.fn_.borrow()); + } /*visit_direct_consumers*/ + + private: + /* event book-keeping on receiving an event. + */ + bool preprocess_secondary_event(Event const & ev) + { + if (this->upstream_exhausted_) { + throw std::runtime_error("SecondarySource::notify_secondary_event" + ": not allowed after upstream exhausted"); + } + + utc_nanos evtm = this->reducer_.event_tm(ev); + + if (this->current_tm_ < evtm) + this->current_tm_ = evtm; + + /* if reducer is empty when .notify_event() begins, + * then reactor/simulator needs to be notified that source is no longer empty + */ + bool is_priming = this->reducer_.is_empty(); + + return is_priming; + } /*preprocess_secondary_event*/ + + /* event bookkeeping after receiving an event. + * + * Require: event has been propagated to .reducer + * + * is_priming. true if event causes source to + * become non-empty --> must notify reactor + */ + void postprocess_secondary_event(bool is_priming) { + using xo::scope; + using xo::xtag; Reactor * reactor = this->parent_reactor_; + scope log(XO_DEBUG(this->debug_sim_flag_), + xtag("name", name_), + xtag("reactor", (void*)reactor), + xtag("is_priming", is_priming)); + if (reactor) { if (is_priming) { /* reactor/simulator takes responsibility for delivering events */ reactor->notify_source_primed(ref::brw::from_native(this)); } } else { - /* special case if no reactor: deliver immediately */ - - //this->deliver_one(); - this->deliver_all(); + /* if no reactor, deliver immediately */ + this->deliver_one(); } - } - } /*notify_secondary_event_v*/ + } /*postprocess_secondary_event*/ - // ----- inherited from EventSource ----- - - CallbackId add_callback(ref::rp const & cb) override { - return this->cb_set_.add_callback(cb); - } /*add_callback*/ - - void remove_callback(CallbackId id) override { - this->cb_set_.remove_callback(id); - } /*remove_callback*/ - - // ----- inherited from ReactorSource ----- - - virtual bool is_empty() const override { return this->reducer_.is_empty(); } - virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); } - - virtual utc_nanos sim_current_tm() const override { - using xo::scope; - using xo::xtag; - - if (this->reducer_.is_empty()) { - /* this is a tricky case. - * it means this source doesn't - * _know_ specific next event yet; however new events - * may appear at any time by way of .notify_event() - * - * If event doesn't know next event, then .current_tm isn't useful - * for establishing priority relative to other sources. - * rely on priming mechanism instead, - * which means that control should never come here. - */ - return this->current_tm_; - } else { - scope log(XO_DEBUG(false /*this->debug_sim_flag_*/), + /* deliver one event from reducer; + * invoke callback whenever replay_flag is true + */ + std::uint64_t deliver_one_aux(bool replay_flag) { + scope log(XO_DEBUG(this->debug_sim_flag_), xtag("name", this->name_), - xtag("next_tm", this->reducer_.next_tm())); + xtag("reducer.empty", this->reducer_.is_empty()), + xtag("replay_flag", replay_flag)); - return this->reducer_.next_tm(); - } - } /*sim_current_tm*/ + if (this->reducer_.is_empty()) + return 0; - virtual std::uint64_t deliver_one() override { - return this->deliver_one_aux(true /*replay_flag*/); - } + /* need to remove event _before_ invoking callbacks; + * callbacks may indirectly call this->notify_secondary_event(), + * modifiying .reducer + * + * reducer may use double-buffering scheme or similar to + * mitigate copying, esp when Event objects are heavy + */ + Event & ev = this->reducer_.annex_one(); - virtual std::uint64_t sim_advance_until(utc_nanos target_tm, - bool replay_flag) override - { - uint64_t retval = 0; + /* if SecondarySource: + * Event ev = this->event_heap_.front(); + * std::pop_heap(this->event_heap_.begin(), + * this->event_heap_.end(), + * std::greater()); + * this->event_heap_.pop_back(); + */ - while (!this->reducer_.is_empty()) { - utc_nanos tm = this->sim_current_tm(); - - if (tm < target_tm) { - retval += this->deliver_one_aux(replay_flag); - } else { - break; - } - } - - return retval; - } /*sim_advance_until*/ - - virtual void notify_reactor_add(Reactor * reactor) override { - assert(!this->parent_reactor_); - - this->parent_reactor_ = reactor; - } /*notify_reactor_add*/ - - virtual void notify_reactor_remove(Reactor * /*reactor*/) override {} - - // ----- inherited from AbstractSource ----- - - virtual TypeDescr source_ev_type() const override { - return reflect::Reflect::require(); - } /*source_ev_type*/ - - virtual uint32_t n_out_ev() const override { return n_out_ev_; } - /* #of events queued for delivery */ - virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); } - - virtual bool debug_sim_flag() const override { return debug_sim_flag_; } - virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; } - - virtual CallbackId attach_sink(ref::rp const & sink) override { - ref::rp native_sink - = EventSink::require_native("SecondarySource::attach_sink", sink); - - if (native_sink) { - if (!this->is_volatile() - || native_sink->allow_volatile_source()) - { - return this->add_callback(native_sink); - } else { - throw std::runtime_error("SecondarySource::attach_sink" - ": sink requires non-volatile source " - + std::string(reflect::type_name())); - } - } else { - throw std::runtime_error("SecondarySource::attach_sink" - ": expected sink accepting " - + std::string(reflect::type_name())); - } - } /*attach_sink*/ - - virtual void detach_sink(CallbackId id) override { - this->remove_callback(id); - } /*detach_sink*/ - - // ----- Inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return name_; } - virtual void set_name(std::string const & x) override { this->name_ = x; } - - virtual void visit_direct_consumers(std::function ep)> const & fn) override { - - for(auto x : this->cb_set_) - fn(x.fn_.borrow()); - } /*visit_direct_consumers*/ - - private: - /* event book-keeping on receiving an event. - */ - bool preprocess_secondary_event(Event const & ev) - { - if (this->upstream_exhausted_) { - throw std::runtime_error("SecondarySource::notify_secondary_event" - ": not allowed after upstream exhausted"); - } - - utc_nanos evtm = this->reducer_.event_tm(ev); - - if (this->current_tm_ < evtm) - this->current_tm_ = evtm; - - /* if reducer is empty when .notify_event() begins, - * then reactor/simulator needs to be notified that source is no longer empty - */ - bool is_priming = this->reducer_.is_empty(); - - return is_priming; - } /*preprocess_secondary_event*/ - - /* event bookkeeping after receiving an event. - * - * Require: event has been propagated to .reducer - * - * is_priming. true if event causes source to - * become non-empty --> must notify reactor - */ - void postprocess_secondary_event(bool is_priming) { - using xo::scope; - using xo::xtag; - - Reactor * reactor = this->parent_reactor_; - - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", name_), - xtag("reactor", (void*)reactor), - xtag("is_priming", is_priming)); - - if (reactor) { - if (is_priming) { - /* reactor/simulator takes responsibility for delivering events */ - reactor->notify_source_primed(ref::brw::from_native(this)); + if (replay_flag) { + ++(this->n_out_ev_); + this->cb_set_.invoke(&EventSink::notify_ev, ev); } - } else { - /* if no reactor, deliver immediately */ - this->deliver_one(); - } - } /*postprocess_secondary_event*/ - /* deliver one event from reducer; - * invoke callback whenever replay_flag is true - */ - std::uint64_t deliver_one_aux(bool replay_flag) { - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", this->name_), - xtag("reducer.empty", this->reducer_.is_empty()), - xtag("replay_flag", replay_flag)); + return 1; + } /*deliver_one_aux*/ - if (this->reducer_.is_empty()) - return 0; + private: + /* current time for this source */ + utc_nanos current_tm_; - /* need to remove event _before_ invoking callbacks; - * callbacks may indirectly call this->notify_secondary_event(), - * modifiying .reducer - * - * reducer may use double-buffering scheme or similar to - * mitigate copying, esp when Event objects are heavy + /* reporting name for this source (use when .debug_sim_flag set) */ - Event & ev = this->reducer_.annex_one(); + std::string name_; - /* if SecondarySource: - * Event ev = this->event_heap_.front(); - * std::pop_heap(this->event_heap_.begin(), - * this->event_heap_.end(), - * std::greater()); - * this->event_heap_.pop_back(); + /* if true, reactor/simulator to log interaction with this source */ + bool debug_sim_flag_ = false; - if (replay_flag) { - ++(this->n_out_ev_); - this->cb_set_.invoke(&EventSink::notify_ev, ev); - } + /* count lifetime #of outgoing events */ + uint32_t n_out_ev_ = 0; - return 1; - } /*deliver_one_aux*/ + /* set this to true, once, to announce that upstream will send + * no more events. see .notify_upstream_exhausted() + */ + bool upstream_exhausted_ = false; - private: - /* current time for this source */ - utc_nanos current_tm_; + /* events to be delivered to callbacks. + * multiple events may be collapsed depending on Reducer implementation + */ + Reducer reducer_; - /* reporting name for this source (use when .debug_sim_flag set) - */ - std::string name_; + /* reactor/simulator being used to schedule consumption. if ommitted, + * will borrow thread calling .notify_secondary_event() + */ + Reactor * parent_reactor_ = nullptr; - /* if true, reactor/simulator to log interaction with this source - */ - bool debug_sim_flag_ = false; - - /* count lifetime #of outgoing events */ - uint32_t n_out_ev_ = 0; - - /* set this to true, once, to announce that upstream will send - * no more events. see .notify_upstream_exhausted() - */ - bool upstream_exhausted_ = false; - - /* events to be delivered to callbacks. - * multiple events may be collapsed depending on Reducer implementation - */ - Reducer reducer_; - - /* reactor/simulator being used to schedule consumption. if ommitted, - * will borrow thread calling .notify_secondary_event() - */ - Reactor * parent_reactor_ = nullptr; - - /* invoke callbacks in this set to send an outgoing event */ - RpCallbackSet cb_set_; - }; /*SecondarySource*/ - } /*namespace reactor*/ + /* invoke callbacks in this set to send an outgoing event */ + RpCallbackSet cb_set_; + }; /*SecondarySource*/ + } /*namespace reactor*/ } /*namespace xo*/ /* end SecondarySource.hpp */ diff --git a/src/reactor/PollingReactor.cpp b/src/reactor/PollingReactor.cpp index 348ca739..a9c52cb7 100644 --- a/src/reactor/PollingReactor.cpp +++ b/src/reactor/PollingReactor.cpp @@ -45,6 +45,11 @@ namespace xo { return false; } /*remove_source*/ + void + PollingReactor::notify_source_primed(brw) { + /* nothing to do here -- all sources always checked by polling loop */ + } /*notify_source_primed*/ + int64_t PollingReactor::find_nonempty_source(size_t start_ix) { @@ -74,13 +79,25 @@ namespace xo { { int64_t ix = this->find_nonempty_source(this->next_ix_); + scope log(XO_DEBUG(this->loglevel() == log_level::chatty)); + + log && log(xtag("self", this), xtag("src_ix", ix)); + + uint64_t retval = 0; + if(ix >= 0) { brw src = this->source_v_[ix]; - return src->deliver_one(); + log && log(xtag("src.name", src->name())); + + retval = src->deliver_one(); } else { - return 0; + retval = 0; } + + log.end_scope(xtag("retval", retval)); + + return retval; } /*run_one*/ } /*namespace reactor*/ } /*namespace xo*/ diff --git a/src/reactor/ReactorSource.cpp b/src/reactor/ReactorSource.cpp index 0bb29397..5f3735ea 100644 --- a/src/reactor/ReactorSource.cpp +++ b/src/reactor/ReactorSource.cpp @@ -1,4 +1,4 @@ -/* @file Source.cpp */ +/* @file ReactorSource.cpp */ #include "ReactorSource.hpp" #include "xo/indentlog/print/time.hpp"