initial implementation

This commit is contained in:
Roland Conybeare 2023-10-11 17:46:24 -04:00
commit 0157f8dc04
12 changed files with 703 additions and 658 deletions

View file

@ -38,10 +38,16 @@ namespace xo {
virtual TypeDescr source_ev_type() const = 0;
/* if true: event objects (see .source_ev_type())
* maybe overwritten between callbacks.
* may be overwritten between callbacks.
* A sink that wants to capture events
* (e.g. EventStore<>) will need to deep-copy them
* if false: event objects are preserved between callbacks.
*
* A source that stores events received from elsewhere (e.g. FifoQueue)
* is probably volatile.
*
* A source that remembers (in explicit memory) every event it produces
* is not volatile
*/
virtual bool is_volatile() const = 0;

View file

@ -2,7 +2,7 @@
#pragma once
#include "time/Time.hpp"
//#include "time/Time.hpp"
#include "reactor/Sink.hpp"
#include "reactor/EventSource.hpp"
#include "reactor/HeapReducer.hpp"
@ -16,4 +16,3 @@ namespace xo {
} /*namespace xo*/
/* end DirectSource.hpp */

View file

@ -2,24 +2,24 @@
#pragma once
#include "reactor/ReactorSource.hpp"
#include "callback/CallbackSet.hpp"
#include "ReactorSource.hpp"
#include "xo/callback/CallbackSet.hpp"
namespace xo {
namespace reactor {
template</*typename Event,*/
typename Callback
/*void (Callback::*member_fn)(Event const &)*/>
class EventSource : public ReactorSource {
public:
using CallbackId = fn::CallbackId;
namespace reactor {
template</*typename Event,*/
typename Callback
/*void (Callback::*member_fn)(Event const &)*/>
class EventSource : public ReactorSource {
public:
using CallbackId = fn::CallbackId;
public:
virtual CallbackId add_callback(ref::rp<Callback> const & cb) = 0;
virtual void remove_callback(CallbackId id) = 0;
}; /*EventSource*/
} /*namespace reactor*/
public:
virtual CallbackId add_callback(ref::rp<Callback> const & cb) = 0;
virtual void remove_callback(CallbackId id) = 0;
}; /*EventSource*/
} /*namespace reactor*/
} /*namespace xo*/
/* end EventSource.hpp */

View file

@ -2,7 +2,8 @@
#pragma once
#include "time/Time.hpp"
//#include "time/Time.hpp"
#include "xo/indentlog/timeutil/timeutil.hpp"
#include <concepts>
namespace xo {
@ -17,7 +18,7 @@ namespace xo {
public:
using event_t = Event;
using utc_nanos = xo::time::utc_nanos;
public:
utc_nanos operator()(Event const & ev) const { return ev.tm(); }
}; /*StructEventTimeFn*/
@ -32,6 +33,15 @@ namespace xo {
utc_nanos operator()(Event const & ev) const { return ev->tm(); }
}; /*PtrEventTimeFn*/
template<typename T>
class PairEventTimeFn {
public:
using utc_nanos = xo::time::utc_nanos;
using event_t = std::pair<utc_nanos, T>;
public:
utc_nanos operator()(event_t const & ev) const { return ev.first; }
}; /*PairEventTimeFn*/
} /*namespace reactor*/
} /*namespace xo*/

View file

@ -1,72 +1,72 @@
/* @file HeapReducer.hpp */
#pragma once
#pragma once
#include "reactor/Reducer.hpp"
#include "Reducer.hpp"
namespace xo {
namespace reactor {
/* collect incoming events in a heap,
* ordered by timestamp.
* output events in increasing timestamp order.
* Information preserving in all other respects
*
* Require:
* - Event is null-constructible
* - Event is copyable
* - EventTimeFn :: Event -> utc_nanos
*/
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
class HeapReducer : public ReducerBase<Event, EventTimeFn> {
public:
using utc_nanos = xo::time::utc_nanos;
public:
HeapReducer() = default;
HeapReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
namespace reactor {
/* collect incoming events in a heap,
* ordered by timestamp.
* output events in increasing timestamp order.
* Information preserving in all other respects
*
* Require:
* - Event is null-constructible
* - Event is copyable
* - EventTimeFn :: Event -> utc_nanos
*/
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
class HeapReducer : public ReducerBase<Event, EventTimeFn> {
public:
using utc_nanos = xo::time::utc_nanos;
public:
HeapReducer() = default;
HeapReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
bool is_empty() const { return this->event_heap_.empty(); }
/* require: .is_empty() = false */
utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); }
/* #of events stored in this reducer */
uint32_t n_event() const { return this->event_heap_.size(); }
bool is_empty() const { return this->event_heap_.empty(); }
/* require: .is_empty() = false */
utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); }
/* #of events stored in this reducer */
uint32_t n_event() const { return this->event_heap_.size(); }
Event const & last_annexed_ev() const { return this->annexed_ev_; }
Event const & last_annexed_ev() const { return this->annexed_ev_; }
void include_event(Event const & ev) {
this->event_heap_.push_back(ev);
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
void include_event(Event const & ev) {
this->event_heap_.push_back(ev);
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
void include_event(Event && ev) {
this->event_heap_.push_back(std::move(ev));
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
void include_event(Event && ev) {
this->event_heap_.push_back(std::move(ev));
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
Event & annex_one() {
this->annexed_ev_ = this->event_heap_.front();
std::pop_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
this->event_heap_.pop_back();
Event & annex_one() {
this->annexed_ev_ = this->event_heap_.front();
std::pop_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
this->event_heap_.pop_back();
return this->annexed_ev_;
} /*annex_one*/
return this->annexed_ev_;
} /*annex_one*/
// ----- Inherited from ReducerBase -----
// ----- Inherited from ReducerBase -----
// utc_nanos event_tm(Event const & x);
private:
/* queued Events, in increasing timestamp order */
std::vector<Event> event_heap_;
/* annexed event, removed from .event_heap */
Event annexed_ev_;
}; /*HeapReducer*/
} /*namespace reactor*/
// utc_nanos event_tm(Event const & x);
private:
/* queued Events, in increasing timestamp order */
std::vector<Event> event_heap_;
/* annexed event, removed from .event_heap */
Event annexed_ev_;
}; /*HeapReducer*/
} /*namespace reactor*/
} /*namespace xo*/
/* end HeapReducer.hpp */

View file

@ -8,37 +8,41 @@
#include <cstdint>
namespace xo {
namespace reactor {
/* reactor that runs by polling an ordered set of sources */
class PollingReactor : public Reactor {
public:
PollingReactor() = default;
namespace reactor {
/* reactor that runs by polling an ordered set of sources */
class PollingReactor : public Reactor {
public:
/* named ctor idiom */
static ref::rp<PollingReactor> make() { return new PollingReactor(); }
// ----- inherited from Reactor -----
// ----- inherited from Reactor -----
virtual bool add_source(ref::brw<ReactorSource> src) override;
virtual bool remove_source(ref::brw<ReactorSource> src) override;
virtual std::uint64_t run_one() override;
virtual bool add_source(ref::brw<ReactorSource> src) override;
virtual bool remove_source(ref::brw<ReactorSource> src) override;
virtual void notify_source_primed(ref::brw<ReactorSource> src) override;
virtual std::uint64_t run_one() override;
private:
/* find non-empty source, starting from .source_v_[start_ix],
* wrapping around to .source_v_[start_ix - 1].
*
* return index of first available non-empty source,
* or -1 if all sources are empty
*/
std::int64_t find_nonempty_source(std::size_t start_ix);
private:
PollingReactor() = default;
private:
/* next source to poll will be .source_v_[.next_ix_] */
std::size_t next_ix_ = 0;
/* find non-empty source, starting from .source_v_[start_ix],
* wrapping around to .source_v_[start_ix - 1].
*
* return index of first available non-empty source,
* or -1 if all sources are empty
*/
std::int64_t find_nonempty_source(std::size_t start_ix);
/* ordered set of sources (see reactor::Source)
* reactor will poll sources in round-robin order
*/
std::vector<ReactorSourcePtr> source_v_;
}; /*PollingReactor*/
} /*namespace reactor*/
private:
/* next source to poll will be .source_v_[.next_ix_] */
std::size_t next_ix_ = 0;
/* ordered set of sources (see reactor::Source)
* reactor will poll sources in round-robin order
*/
std::vector<ReactorSourcePtr> source_v_;
}; /*PollingReactor*/
} /*namespace reactor*/
} /*namespace xo*/
/* end PollingReactor.hpp */

View file

@ -9,84 +9,84 @@
#include "xo/reflect/Reflect.hpp"
namespace xo {
namespace reactor {
/* adapter between a source that delivers a particular event type T,
* and a sink that accepts arbitrarily-typed events via .notify_ev_tp()
* Use this to connect to a polymorphic sink.
*
* Require:
* - .poly_sink.allow_polymorphic_source()
* (ofc. otherwise no point in using PolyAdapterSink<T>)
* - .poly_sink.allow_volatile_source()
* need this bc will be wrapping event with TaggedPtr,
* which doesn't manage event lifetime
*/
template<typename T>
class PolyAdapterSink : public reactor::Sink1<T> {
public:
using Reflect = reflect::Reflect;
using TaggedPtr = reflect::TaggedPtr;
namespace reactor {
/* adapter between a source that delivers a particular event type T,
* and a sink that accepts arbitrarily-typed events via .notify_ev_tp()
* Use this to connect to a polymorphic sink.
*
* Require:
* - .poly_sink.allow_polymorphic_source()
* (ofc. otherwise no point in using PolyAdapterSink<T>)
* - .poly_sink.allow_volatile_source()
* need this bc will be wrapping event with TaggedPtr,
* which doesn't manage event lifetime
*/
template<typename T>
class PolyAdapterSink : public reactor::Sink1<T> {
public:
using Reflect = reflect::Reflect;
using TaggedPtr = reflect::TaggedPtr;
public:
/* named ctor idiom */
static ref::rp<PolyAdapterSink> make(ref::rp<AbstractSink> poly_sink) {
//xo::scope lscope("PolyAdapterSink::make");
public:
/* named ctor idiom */
static ref::rp<PolyAdapterSink> make(ref::rp<AbstractSink> poly_sink) {
//xo::scope lscope("PolyAdapterSink::make");
ref::rp<PolyAdapterSink> retval(new PolyAdapterSink(poly_sink));
ref::rp<PolyAdapterSink> retval(new PolyAdapterSink(poly_sink));
//lscope.log("adapter", (void*)retval.get());
//lscope.log("adapter", (void*)retval.get());
return retval;
} /*make*/
return retval;
} /*make*/
// ----- Inherited from Sink1<T> -----
// ----- Inherited from Sink1<T> -----
virtual void notify_ev(T const & ev) override {
//xo::scope lscope("PolyAdapterSink::notify_ev");
//lscope.log(xo::xtag("ev", ev));
virtual void notify_ev(T const & ev) override {
//xo::scope lscope("PolyAdapterSink::notify_ev");
//lscope.log(xo::xtag("ev", ev));
TaggedPtr ev_tp = Reflect::make_tp(const_cast<T *>(&ev));
TaggedPtr ev_tp = Reflect::make_tp(const_cast<T *>(&ev));
this->notify_ev_tp(ev_tp);
} /*notify_ev*/
this->notify_ev_tp(ev_tp);
} /*notify_ev*/
// ----- Inherited from AbstractSink -----
// ----- Inherited from AbstractSink -----
virtual bool allow_volatile_source() const override { return true; }
virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); }
/* note: ok to do this, however if expecting to use this entry point,
* maybe don't need to interpose PolyAdapterSink<T> ahead of .poly_sink
*/
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
//xo::scope lscope("PolyAdapterSink::notify_ev_tp");
virtual bool allow_volatile_source() const override { return true; }
virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); }
/* note: ok to do this, however if expecting to use this entry point,
* maybe don't need to interpose PolyAdapterSink<T> ahead of .poly_sink
*/
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
//xo::scope lscope("PolyAdapterSink::notify_ev_tp");
return this->poly_sink_->notify_ev_tp(ev_tp);
}
return this->poly_sink_->notify_ev_tp(ev_tp);
}
// ----- Inherited from AbstractEventProcessor -----
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return this->poly_sink_->name(); }
virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
this->poly_sink_->visit_direct_consumers(fn);
}
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<PolyAdapterSink"
<< xtag("addr", (void*)this)
<< xtag("T", reflect::type_name<T>())
<< xtag("poly", this->poly_sink_)
<< ">";
} /*display*/
virtual std::string const & name() const override { return this->poly_sink_->name(); }
virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
this->poly_sink_->visit_direct_consumers(fn);
}
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<PolyAdapterSink"
<< xtag("addr", (void*)this)
<< xtag("T", reflect::type_name<T>())
<< xtag("poly", this->poly_sink_)
<< ">";
} /*display*/
private:
PolyAdapterSink(ref::rp<AbstractSink> poly_sink) : poly_sink_{std::move(poly_sink)} {}
private:
PolyAdapterSink(ref::rp<AbstractSink> poly_sink) : poly_sink_{std::move(poly_sink)} {}
private:
/* mandate: .poly_sink.allow_polymorphic_source() is true */
ref::rp<AbstractSink> poly_sink_;
}; /*PolyAdapterSink*/
} /*namespace reactor*/
private:
/* mandate: .poly_sink.allow_polymorphic_source() is true */
ref::rp<AbstractSink> poly_sink_;
}; /*PolyAdapterSink*/
} /*namespace reactor*/
} /*namespace xo*/
/* end PolyAdapterSink.hpp */

View file

@ -3,61 +3,69 @@
#pragma once
#include "xo/refcnt/Refcounted.hpp"
#include "xo/indentlog/log_level.hpp"
#include <cstdint>
namespace xo {
namespace reactor {
class ReactorSource;
namespace reactor {
class ReactorSource;
/* abtract api for a reactor:
* something that arranges to have work done on a set of Sources.
*/
class Reactor : public ref::Refcount {
public:
virtual ~Reactor() = default;
/* abtract api for a reactor:
* something that arranges to have work done on a set of Sources.
*/
class Reactor : public ref::Refcount {
public:
virtual ~Reactor() = default;
/* add source src to this reactor.
* on success, invoke src.notify_reactor_add(this)
*
* returns true if source added; false if already present
*/
virtual bool add_source(ref::brw<ReactorSource> src) = 0;
log_level loglevel() const { return loglevel_; }
void set_loglevel(log_level loglevel) { loglevel_ = loglevel; }
/* remove source src from this reactor.
* source must previously have been added by
* .add_source(src).
*
* on success, invoke src.notify_reactor_remove(this)
*
* returns true if source removed; false if not present
*/
virtual bool remove_source(ref::brw<ReactorSource> src) = 0;
/* add source src to this reactor.
* on success, invoke src.notify_reactor_add(this)
*
* returns true if source added; false if already present
*/
virtual bool add_source(ref::brw<ReactorSource> src) = 0;
/* notification when non-primed source (source with no known events)
* becomes primed (source with at least one event)
*/
virtual void notify_source_primed(ref::brw<ReactorSource> src) = 0;
/* remove source src from this reactor.
* source must previously have been added by
* .add_source(src).
*
* on success, invoke src.notify_reactor_remove(this)
*
* returns true if source removed; false if not present
*/
virtual bool remove_source(ref::brw<ReactorSource> src) = 0;
/* dispatch one reactor event, borrowing the calling thread
* amount of work this represents is Source/Sink specific.
*
* returns #of events dispatched (0 or 1)
*/
virtual std::uint64_t run_one() = 0;
/* notification when non-primed source (source with no known events)
* becomes primed (source with at least one event)
*/
virtual void notify_source_primed(ref::brw<ReactorSource> src) = 0;
/* borrow calling thread to dispatch reactor events.
* if n is -1, run indefinitely
* otherwise dispatch up to n events.
* n = 0 is a noop
*/
void run_n(int32_t n);
/* dispatch one reactor event, borrowing the calling thread
* amount of work this represents is Source/Sink specific.
*
* returns #of events dispatched (0 or 1)
*/
virtual std::uint64_t run_one() = 0;
/* borrow calling thread to run indefinitely.
* suitable implementation for dedicated reactor threads
*/
void run() { this->run_n(-1); }
}; /*Reactor*/
} /*namespace reactor*/
/* borrow calling thread to dispatch reactor events.
* if n is -1, run indefinitely
* otherwise dispatch up to n events.
* n = 0 is a noop
*/
void run_n(int32_t n);
/* borrow calling thread to run indefinitely.
* suitable implementation for dedicated reactor threads
*/
void run() { this->run_n(-1); }
private:
/* control logging verbosity */
log_level loglevel_;
}; /*Reactor*/
} /*namespace reactor*/
} /*namespace xo*/
/* end Reactor.hpp */

View file

@ -7,124 +7,124 @@
#include <cstdint>
namespace xo {
namespace reactor {
class Reactor;
namespace reactor {
class Reactor;
/* abstract api for a source of events.
* Event representation is left open: Sources and Sinks
* need to have compatible event representations,
* and coordination is left to such (Source, Sink) pairs.
*
* Source->Sink activity may be expected to be mediated by a reactor,
* that implements the Reactor api.
*
* At any time, A Source can be associated with at most one reactor.
* Sources are informed of Reactor<->Source association being
* formed/broken by the
* .notify_reactor_add(), .notify_reactor_remove()
* methods
*
* The source api intends also to provide for simulation.
* There introduces two simulation-specific methods:
* .sim_current_tm()
* .sim_advance_until()
*
* A non-simulation source can implement these as calls to
* .online_current_tm(), .online_advance_until() respectively
* .online_current_tm() aborts since an online source is never exhausted
* .online_advance_until() is a no-op that returns 0
*
* Loop for consuming from a primary simulation source:
*
* brw<Source> s = ...;
* while(!s->is_exhausted())
* s->deliver_one();
*
* Secondary sources (sources that depend on other sources) can be
* in a state where they don't know their next event, in which case:
*
* s->is_notprimed() == true
*/
class ReactorSource : public AbstractSource {
public:
using utc_nanos = xo::time::utc_nanos;
/* abstract api for a source of events.
* Event representation is left open: Sources and Sinks
* need to have compatible event representations,
* and coordination is left to such (Source, Sink) pairs.
*
* Source->Sink activity may be expected to be mediated by a reactor,
* that implements the Reactor api.
*
* At any time, A Source can be associated with at most one reactor.
* Sources are informed of Reactor<->Source association being
* formed/broken by the
* .notify_reactor_add(), .notify_reactor_remove()
* methods
*
* The source api intends also to provide for simulation.
* There introduces two simulation-specific methods:
* .sim_current_tm()
* .sim_advance_until()
*
* A non-simulation source can implement these as calls to
* .online_current_tm(), .online_advance_until() respectively
* .online_current_tm() aborts since an online source is never exhausted
* .online_advance_until() is a no-op that returns 0
*
* Loop for consuming from a primary simulation source:
*
* brw<Source> s = ...;
* while(!s->is_exhausted())
* s->deliver_one();
*
* Secondary sources (sources that depend on other sources) can be
* in a state where they don't know their next event, in which case:
*
* s->is_notprimed() == true
*/
class ReactorSource : public AbstractSource {
public:
using utc_nanos = xo::time::utc_nanos;
public:
virtual ~ReactorSource() = default;
public:
virtual ~ReactorSource() = default;
/* true if source is currently empty (has 0 events to deliver) */
virtual bool is_empty() const = 0;
bool is_nonempty() const { return !this->is_empty(); }
/* true if source is currently empty (has 0 events to deliver) */
virtual bool is_empty() const = 0;
bool is_nonempty() const { return !this->is_empty(); }
/* true when source knows its next event
* A source that isn't primed is also excluded from simulation
* heap until it becomes primed.
* This make feasible simulation sources that
* depend on other simulation sources
*/
virtual bool is_primed() const { return !this->is_empty(); }
virtual bool is_notprimed() const { return this->is_empty(); }
/* true when source knows its next event
* A source that isn't primed is also excluded from simulation
* heap until it becomes primed.
* This make feasible simulation sources that
* depend on other simulation sources
*/
virtual bool is_primed() const { return !this->is_empty(); }
virtual bool is_notprimed() const { return this->is_empty(); }
/* if true, this source has no events, and will never publish more events
* - for sim, return true for a standalone source that has replayed all events
* - for rt, set during orderly
*/
virtual bool is_exhausted() const = 0;
/* if true, this source has no events, and will never publish more events
* - for sim, return true for a standalone source that has replayed all events
* - for rt, set during orderly
*/
virtual bool is_exhausted() const = 0;
/* if this is a simulation source and .is_exhausted is false:
* returns next event time; more precisely, no events exist prior to
* this time.
*
* if sim, and .is_primed = true,
* returns timestamp of next event
*/
virtual utc_nanos sim_current_tm() const = 0;
/* if this is a simulation source and .is_exhausted is false:
* returns next event time; more precisely, no events exist prior to
* this time.
*
* if sim, and .is_primed = true,
* returns timestamp of next event
*/
virtual utc_nanos sim_current_tm() const = 0;
/* promise:
* - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true
* - if replay_flag is true, then any events between previous .current_tm()
* and new .current_tm() will have been published
*
* returns #of events delivered.
* does not count events that were skipped, so always returns 0 if
* replay_flag is false
*/
virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0;
/* promise:
* - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true
* - if replay_flag is true, then any events between previous .current_tm()
* and new .current_tm() will have been published
*
* returns #of events delivered.
* does not count events that were skipped, so always returns 0 if
* replay_flag is false
*/
virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0;
/* informs source when it's added to a reactor
/* informs source when it's added to a reactor
* (see Reactor.add_source())
*/
virtual void notify_reactor_add(Reactor * /*reactor*/) {}
* (see Reactor.add_source())
*/
virtual void notify_reactor_add(Reactor * /*reactor*/) {}
/* informs source when it's removed from a reactor
* (see Reactor.remove_source())
*/
virtual void notify_reactor_remove(Reactor * /*reactor*/) {}
/* informs source when it's removed from a reactor
* (see Reactor.remove_source())
*/
virtual void notify_reactor_remove(Reactor * /*reactor*/) {}
// ----- Inherited from AbstractSource -----
// ----- Inherited from AbstractSource -----
/* deliver one event to attached sink
* interpretation of 'one event' is source-specific;
* could be a collapsed or batched event in practice.
*
* no-op if source is empty.
*
* if sim, promise:
* - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted()
*
* returns #of events delivered. Must be 0 or 1 in this context
*/
virtual std::uint64_t deliver_one() override = 0;
/* deliver one event to attached sink
* interpretation of 'one event' is source-specific;
* could be a collapsed or batched event in practice.
*
* no-op if source is empty.
*
* if sim, promise:
* - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted()
*
* returns #of events delivered. Must be 0 or 1 in this context
*/
virtual std::uint64_t deliver_one() override = 0;
protected:
/* default implementations for online sources */
utc_nanos online_current_tm() const;
uint64_t online_advance_until(utc_nanos tm, bool replay_flag);
}; /*ReactorSource*/
protected:
/* default implementations for online sources */
utc_nanos online_current_tm() const;
uint64_t online_advance_until(utc_nanos tm, bool replay_flag);
}; /*ReactorSource*/
using ReactorSourcePtr = ref::rp<ReactorSource>;
} /*namespace reactor*/
using ReactorSourcePtr = ref::rp<ReactorSource>;
} /*namespace reactor*/
} /*namespace xo*/
/* end ReactorSource.hpp */

View file

@ -2,358 +2,359 @@
#pragma once
#include "time/Time.hpp"
#include "reactor/Sink.hpp"
#include "reactor/DirectSource.hpp"
#include "reactor/Reactor.hpp"
#include "callback/CallbackSet.hpp"
#include "reflect/demangle.hpp"
//#include "time/Time.hpp"
#include "Sink.hpp"
//#include "xo/reactor/DirectSource.hpp"
#include "Reactor.hpp"
#include "HeapReducer.hpp"
#include "xo/callback/CallbackSet.hpp"
#include "xo/cxxutil/demangle.hpp"
#include <vector>
namespace xo {
namespace reactor {
/* A passive event source.
* Can use as backend publisher when implementating another
* event processor.
*/
template<typename Event, typename Reducer = HeapReducer<Event>>
class SecondarySource : public EventSource<Sink1<Event>> {
public:
using EventSink = Sink1<Event>;
template<typename Fn>
using RpCallbackSet = fn::RpCallbackSet<Fn>;
using CallbackId = fn::CallbackId;
using TypeDescr = xo::reflect::TypeDescr;
using utc_nanos = xo::time::utc_nanos;
namespace reactor {
/* A passive event source.
* Can use as backend publisher when implementating another
* event processor.
*/
template<typename Event, typename Reducer = HeapReducer<Event>>
class SecondarySource : public EventSource<Sink1<Event>> {
public:
using EventSink = Sink1<Event>;
template<typename Fn>
using RpCallbackSet = fn::RpCallbackSet<Fn>;
using CallbackId = fn::CallbackId;
using TypeDescr = xo::reflect::TypeDescr;
using utc_nanos = xo::time::utc_nanos;
public:
~SecondarySource() = default;
public:
~SecondarySource() = default;
static ref::rp<SecondarySource> make() { return new SecondarySource(); }
static ref::rp<SecondarySource> make() { return new SecondarySource(); }
/* last event delivered from this source --
* i.e. event in most recent call to .deliver_one_aux()
*/
Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); }
/* last event delivered from this source --
* i.e. event in most recent call to .deliver_one_aux()
*/
Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); }
void notify_upstream_exhausted() { this->upstream_exhausted_ = true; }
void notify_upstream_exhausted() { this->upstream_exhausted_ = true; }
/* make event available to reactor, by adding to internal reducer */
void notify_secondary_event(Event const & ev) {
/* test if ev is priming, update .current_tm */
bool is_priming = this->preprocess_secondary_event(ev);
/* make event available to reactor, by adding to internal reducer */
void notify_secondary_event(Event const & ev) {
/* test if ev is priming, update .current_tm */
bool is_priming = this->preprocess_secondary_event(ev);
this->reducer_.include_event(ev);
this->reducer_.include_event(ev);
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
void notify_secondary_event(Event && ev) {
bool is_priming = this->preprocess_secondary_event(ev);
void notify_secondary_event(Event && ev) {
bool is_priming = this->preprocess_secondary_event(ev);
this->reducer_.include_event(ev);
this->reducer_.include_event(ev);
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
template<typename T>
void notify_secondary_event_v(T const & v) {
using xo::scope;
using xo::xtag;
template<typename T>
void notify_secondary_event_v(T const & v) {
using xo::scope;
using xo::xtag;
if (v.empty())
return;
if (v.empty())
return;
scope log(XO_DEBUG(this->debug_sim_flag_));
scope log(XO_DEBUG(this->debug_sim_flag_));
log && log(xtag("name", this->name()));
log && log(xtag("name", this->name()));
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event_v"
": not allowed after upstream exhausted");
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event_v"
": not allowed after upstream exhausted");
}
uint32_t n_ev = 0;
for (Event const & ev : v) {
utc_nanos evtm = this->reducer_.event_tm(ev);
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
++n_ev;
}
log && log(xtag("T", reflect::type_name<T>()),
xtag("n_ev", n_ev));
if (n_ev > 0) {
/* if reducer is empty when .notify_secondary_event_v() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
for (Event const & ev : v)
this->reducer_.include_event(ev);
Reactor * reactor = this->parent_reactor_;
if (reactor) {
if (is_priming) {
/* reactor/simulator takes responsibility for delivering events */
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
}
} else {
/* special case if no reactor: deliver immediately */
//this->deliver_one();
this->deliver_all();
}
}
} /*notify_secondary_event_v*/
// ----- inherited from EventSource -----
virtual CallbackId add_callback(ref::rp<EventSink> const & cb) override {
return this->cb_set_.add_callback(cb);
} /*add_callback*/
virtual void remove_callback(CallbackId id) override {
this->cb_set_.remove_callback(id);
} /*remove_callback*/
// ----- inherited from ReactorSource -----
virtual bool is_empty() const override { return this->reducer_.is_empty(); }
virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); }
virtual utc_nanos sim_current_tm() const override {
using xo::scope;
using xo::xtag;
if (this->reducer_.is_empty()) {
/* this is a tricky case.
* it means this source doesn't
* _know_ specific next event yet; however new events
* may appear at any time by way of .notify_event()
*
* If event doesn't know next event, then .current_tm isn't useful
* for establishing priority relative to other sources.
* rely on priming mechanism instead,
* which means that control should never come here.
*/
return this->current_tm_;
} else {
scope log(XO_DEBUG(false /*this->debug_sim_flag_*/),
xtag("name", this->name_),
xtag("next_tm", this->reducer_.next_tm()));
return this->reducer_.next_tm();
}
} /*sim_current_tm*/
virtual std::uint64_t deliver_one() override {
return this->deliver_one_aux(true /*replay_flag*/);
}
uint32_t n_ev = 0;
virtual std::uint64_t sim_advance_until(utc_nanos target_tm,
bool replay_flag) override
{
uint64_t retval = 0;
for (Event const & ev : v) {
utc_nanos evtm = this->reducer_.event_tm(ev);
while (!this->reducer_.is_empty()) {
utc_nanos tm = this->sim_current_tm();
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
if (tm < target_tm) {
retval += this->deliver_one_aux(replay_flag);
} else {
break;
}
}
++n_ev;
}
return retval;
} /*sim_advance_until*/
log && log(xtag("T", reflect::type_name<T>()),
xtag("n_ev", n_ev));
virtual void notify_reactor_add(Reactor * reactor) override {
assert(!this->parent_reactor_);
if (n_ev > 0) {
/* if reducer is empty when .notify_secondary_event_v() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
this->parent_reactor_ = reactor;
} /*notify_reactor_add*/
for (Event const & ev : v)
this->reducer_.include_event(ev);
virtual void notify_reactor_remove(Reactor * /*reactor*/) override {}
// ----- inherited from AbstractSource -----
virtual TypeDescr source_ev_type() const override {
return reflect::Reflect::require<Event>();
} /*source_ev_type*/
virtual uint32_t n_out_ev() const override { return n_out_ev_; }
/* #of events queued for delivery */
virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); }
virtual bool debug_sim_flag() const override { return debug_sim_flag_; }
virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; }
virtual CallbackId attach_sink(ref::rp<AbstractSink> const & sink) override {
ref::rp<EventSink> native_sink
= EventSink::require_native("SecondarySource::attach_sink", sink);
if (native_sink) {
if (!this->is_volatile()
|| native_sink->allow_volatile_source())
{
return this->add_callback(native_sink);
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": sink requires non-volatile source "
+ std::string(reflect::type_name<Event>()));
}
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": expected sink accepting "
+ std::string(reflect::type_name<Event>()));
}
} /*attach_sink*/
virtual void detach_sink(CallbackId id) override {
this->remove_callback(id);
} /*detach_sink*/
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { this->name_ = x; }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
for(auto x : this->cb_set_)
fn(x.fn_.borrow());
} /*visit_direct_consumers*/
private:
/* event book-keeping on receiving an event.
*/
bool preprocess_secondary_event(Event const & ev)
{
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event"
": not allowed after upstream exhausted");
}
utc_nanos evtm = this->reducer_.event_tm(ev);
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
/* if reducer is empty when .notify_event() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
return is_priming;
} /*preprocess_secondary_event*/
/* event bookkeeping after receiving an event.
*
* Require: event has been propagated to .reducer
*
* is_priming. true if event causes source to
* become non-empty --> must notify reactor
*/
void postprocess_secondary_event(bool is_priming) {
using xo::scope;
using xo::xtag;
Reactor * reactor = this->parent_reactor_;
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", name_),
xtag("reactor", (void*)reactor),
xtag("is_priming", is_priming));
if (reactor) {
if (is_priming) {
/* reactor/simulator takes responsibility for delivering events */
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
}
} else {
/* special case if no reactor: deliver immediately */
//this->deliver_one();
this->deliver_all();
/* if no reactor, deliver immediately */
this->deliver_one();
}
}
} /*notify_secondary_event_v*/
} /*postprocess_secondary_event*/
// ----- inherited from EventSource -----
CallbackId add_callback(ref::rp<EventSink> const & cb) override {
return this->cb_set_.add_callback(cb);
} /*add_callback*/
void remove_callback(CallbackId id) override {
this->cb_set_.remove_callback(id);
} /*remove_callback*/
// ----- inherited from ReactorSource -----
virtual bool is_empty() const override { return this->reducer_.is_empty(); }
virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); }
virtual utc_nanos sim_current_tm() const override {
using xo::scope;
using xo::xtag;
if (this->reducer_.is_empty()) {
/* this is a tricky case.
* it means this source doesn't
* _know_ specific next event yet; however new events
* may appear at any time by way of .notify_event()
*
* If event doesn't know next event, then .current_tm isn't useful
* for establishing priority relative to other sources.
* rely on priming mechanism instead,
* which means that control should never come here.
*/
return this->current_tm_;
} else {
scope log(XO_DEBUG(false /*this->debug_sim_flag_*/),
/* deliver one event from reducer;
* invoke callback whenever replay_flag is true
*/
std::uint64_t deliver_one_aux(bool replay_flag) {
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", this->name_),
xtag("next_tm", this->reducer_.next_tm()));
xtag("reducer.empty", this->reducer_.is_empty()),
xtag("replay_flag", replay_flag));
return this->reducer_.next_tm();
}
} /*sim_current_tm*/
if (this->reducer_.is_empty())
return 0;
virtual std::uint64_t deliver_one() override {
return this->deliver_one_aux(true /*replay_flag*/);
}
/* need to remove event _before_ invoking callbacks;
* callbacks may indirectly call this->notify_secondary_event(),
* modifiying .reducer
*
* reducer may use double-buffering scheme or similar to
* mitigate copying, esp when Event objects are heavy
*/
Event & ev = this->reducer_.annex_one();
virtual std::uint64_t sim_advance_until(utc_nanos target_tm,
bool replay_flag) override
{
uint64_t retval = 0;
/* if SecondarySource:
* Event ev = this->event_heap_.front();
* std::pop_heap(this->event_heap_.begin(),
* this->event_heap_.end(),
* std::greater<Event>());
* this->event_heap_.pop_back();
*/
while (!this->reducer_.is_empty()) {
utc_nanos tm = this->sim_current_tm();
if (tm < target_tm) {
retval += this->deliver_one_aux(replay_flag);
} else {
break;
}
}
return retval;
} /*sim_advance_until*/
virtual void notify_reactor_add(Reactor * reactor) override {
assert(!this->parent_reactor_);
this->parent_reactor_ = reactor;
} /*notify_reactor_add*/
virtual void notify_reactor_remove(Reactor * /*reactor*/) override {}
// ----- inherited from AbstractSource -----
virtual TypeDescr source_ev_type() const override {
return reflect::Reflect::require<Event>();
} /*source_ev_type*/
virtual uint32_t n_out_ev() const override { return n_out_ev_; }
/* #of events queued for delivery */
virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); }
virtual bool debug_sim_flag() const override { return debug_sim_flag_; }
virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; }
virtual CallbackId attach_sink(ref::rp<AbstractSink> const & sink) override {
ref::rp<EventSink> native_sink
= EventSink::require_native("SecondarySource::attach_sink", sink);
if (native_sink) {
if (!this->is_volatile()
|| native_sink->allow_volatile_source())
{
return this->add_callback(native_sink);
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": sink requires non-volatile source "
+ std::string(reflect::type_name<Event>()));
}
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": expected sink accepting "
+ std::string(reflect::type_name<Event>()));
}
} /*attach_sink*/
virtual void detach_sink(CallbackId id) override {
this->remove_callback(id);
} /*detach_sink*/
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { this->name_ = x; }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
for(auto x : this->cb_set_)
fn(x.fn_.borrow());
} /*visit_direct_consumers*/
private:
/* event book-keeping on receiving an event.
*/
bool preprocess_secondary_event(Event const & ev)
{
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event"
": not allowed after upstream exhausted");
}
utc_nanos evtm = this->reducer_.event_tm(ev);
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
/* if reducer is empty when .notify_event() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
return is_priming;
} /*preprocess_secondary_event*/
/* event bookkeeping after receiving an event.
*
* Require: event has been propagated to .reducer
*
* is_priming. true if event causes source to
* become non-empty --> must notify reactor
*/
void postprocess_secondary_event(bool is_priming) {
using xo::scope;
using xo::xtag;
Reactor * reactor = this->parent_reactor_;
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", name_),
xtag("reactor", (void*)reactor),
xtag("is_priming", is_priming));
if (reactor) {
if (is_priming) {
/* reactor/simulator takes responsibility for delivering events */
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
if (replay_flag) {
++(this->n_out_ev_);
this->cb_set_.invoke(&EventSink::notify_ev, ev);
}
} else {
/* if no reactor, deliver immediately */
this->deliver_one();
}
} /*postprocess_secondary_event*/
/* deliver one event from reducer;
* invoke callback whenever replay_flag is true
*/
std::uint64_t deliver_one_aux(bool replay_flag) {
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", this->name_),
xtag("reducer.empty", this->reducer_.is_empty()),
xtag("replay_flag", replay_flag));
return 1;
} /*deliver_one_aux*/
if (this->reducer_.is_empty())
return 0;
private:
/* current time for this source */
utc_nanos current_tm_;
/* need to remove event _before_ invoking callbacks;
* callbacks may indirectly call this->notify_secondary_event(),
* modifiying .reducer
*
* reducer may use double-buffering scheme or similar to
* mitigate copying, esp when Event objects are heavy
/* reporting name for this source (use when .debug_sim_flag set)
*/
Event & ev = this->reducer_.annex_one();
std::string name_;
/* if SecondarySource:
* Event ev = this->event_heap_.front();
* std::pop_heap(this->event_heap_.begin(),
* this->event_heap_.end(),
* std::greater<Event>());
* this->event_heap_.pop_back();
/* if true, reactor/simulator to log interaction with this source
*/
bool debug_sim_flag_ = false;
if (replay_flag) {
++(this->n_out_ev_);
this->cb_set_.invoke(&EventSink::notify_ev, ev);
}
/* count lifetime #of outgoing events */
uint32_t n_out_ev_ = 0;
return 1;
} /*deliver_one_aux*/
/* set this to true, once, to announce that upstream will send
* no more events. see .notify_upstream_exhausted()
*/
bool upstream_exhausted_ = false;
private:
/* current time for this source */
utc_nanos current_tm_;
/* events to be delivered to callbacks.
* multiple events may be collapsed depending on Reducer implementation
*/
Reducer reducer_;
/* reporting name for this source (use when .debug_sim_flag set)
*/
std::string name_;
/* reactor/simulator being used to schedule consumption. if ommitted,
* will borrow thread calling .notify_secondary_event()
*/
Reactor * parent_reactor_ = nullptr;
/* if true, reactor/simulator to log interaction with this source
*/
bool debug_sim_flag_ = false;
/* count lifetime #of outgoing events */
uint32_t n_out_ev_ = 0;
/* set this to true, once, to announce that upstream will send
* no more events. see .notify_upstream_exhausted()
*/
bool upstream_exhausted_ = false;
/* events to be delivered to callbacks.
* multiple events may be collapsed depending on Reducer implementation
*/
Reducer reducer_;
/* reactor/simulator being used to schedule consumption. if ommitted,
* will borrow thread calling .notify_secondary_event()
*/
Reactor * parent_reactor_ = nullptr;
/* invoke callbacks in this set to send an outgoing event */
RpCallbackSet<EventSink> cb_set_;
}; /*SecondarySource*/
} /*namespace reactor*/
/* invoke callbacks in this set to send an outgoing event */
RpCallbackSet<EventSink> cb_set_;
}; /*SecondarySource*/
} /*namespace reactor*/
} /*namespace xo*/
/* end SecondarySource.hpp */

View file

@ -45,6 +45,11 @@ namespace xo {
return false;
} /*remove_source*/
void
PollingReactor::notify_source_primed(brw<ReactorSource>) {
/* nothing to do here -- all sources always checked by polling loop */
} /*notify_source_primed*/
int64_t
PollingReactor::find_nonempty_source(size_t start_ix)
{
@ -74,13 +79,25 @@ namespace xo {
{
int64_t ix = this->find_nonempty_source(this->next_ix_);
scope log(XO_DEBUG(this->loglevel() == log_level::chatty));
log && log(xtag("self", this), xtag("src_ix", ix));
uint64_t retval = 0;
if(ix >= 0) {
brw<ReactorSource> src = this->source_v_[ix];
return src->deliver_one();
log && log(xtag("src.name", src->name()));
retval = src->deliver_one();
} else {
return 0;
retval = 0;
}
log.end_scope(xtag("retval", retval));
return retval;
} /*run_one*/
} /*namespace reactor*/
} /*namespace xo*/

View file

@ -1,4 +1,4 @@
/* @file Source.cpp */
/* @file ReactorSource.cpp */
#include "ReactorSource.hpp"
#include "xo/indentlog/print/time.hpp"