initial implementation

This commit is contained in:
Roland Conybeare 2023-10-10 15:20:32 -04:00
commit 532d48529f
29 changed files with 2329 additions and 0 deletions

51
CMakeLists.txt Normal file
View file

@ -0,0 +1,51 @@
# xo-reactor/CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(reactor VERSION 0.1)
enable_language(CXX)
# common XO cmake macros (see proj/xo-cmake)
include(xo_macros/xo_cxx)
include(xo_macros/code-coverage)
# ----------------------------------------------------------------
# unit test setup
enable_testing()
# activate code coverage for all executables + libraries (when configured with -DCODE_COVERAGE=ON)
add_code_coverage()
# 1. assuming that /nix/store/ prefixes .hpp files belonging to gcc, catch2 etc.
# we're not interested in code coverage for these sources.
# 2. exclude the utest/ subdir, we don't need coverage on the unit tests themselves;
# rather, want coverage on the code that the unit tests exercise.
#
# NOTE: this seems to work only with the 'ccov-all' target. In particular, doesn't seem to do anything with the 'ccov' target
#
add_code_coverage_all_targets(EXCLUDE /nix/store/* ${PROJECT_SOURCE_DIR}/utest/* ${PROJECT_BINARY_DIR}/local/* ${PROJECT_SOURCE_DIR}/repo/*)
# ----------------------------------------------------------------
# c++ settings
# one-time project-specific c++ flags. usually empty
set(PROJECT_CXX_FLAGS "")
#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2")
add_definitions(${PROJECT_CXX_FLAGS})
xo_toplevel_compile_options()
# ----------------------------------------------------------------
add_subdirectory(src/reactor)
# ----------------------------------------------------------------
# provide find_pacakge() support for reactor customers
xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets)
# ----------------------------------------------------------------
# install .hpp files
xo_install_include_tree()
# end CMakeLists.txt

42
README.md Normal file
View file

@ -0,0 +1,42 @@
# reactor library
in-memory queuing system
# dependencies
build+install these first
- xo-reflect [github.com/Rconybea/xo-reflect]
- xo-callback [github.com/Rconybea/xo-callback]
# build + install
# build
```
$ cd reactor
$ mkdir build
$ cd build
$ INSTALL_PREFIX=/usr/local # or wherever you prefer
$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} ..
$ make
$ make install
```
(also see .github/workflows/main.yml)
# build for unit test coverage
```
$ cd xo-reactor
$ mkdir ccov
$ cd ccov
$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCODE_COVERAGE=ON -DCMAKE_BUILD_TYPE=Debug ..
```
# LSP support
LSP looks for compile commands in the root of the source tree;
cmake creates them in the root of its build directory.
```
$ cd xo-reactor
$ ln -s build/compile_commands.json
```

View file

@ -0,0 +1,13 @@
@PACKAGE_INIT@
include(CMakeFindDependencyMacro)
# note: changes to find_dependency() calls here
# must coordinate with xo_dependency() calls
# in xo-reactor/src/reactor/CMakeLists.txt
#
find_dependency(reflect)
find_dependency(callback)
include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake")
check_required_components("@PROJECT_NAME@")

View file

@ -0,0 +1,50 @@
/* @file AbstractEventProcessor.hpp */
#pragma once
#include "xo/refcnt/Refcounted.hpp"
#include <functional>
#include <vector>
#include <string>
namespace xo {
namespace reactor {
/* common base class for {AbstractSource, AbstractSink}.
* An event processor can be:
* 1. an event source (inherits AbstractSource)
* 2. an event sink (inherits AbstractSink)
* 3. both source+sink (inherits both)
*/
class AbstractEventProcessor : virtual public ref::Refcount {
public:
/* reporting name for this source. ideally unique, but not required */
virtual std::string const & name() const = 0;
/* set .name */
virtual void set_name(std::string const & x) = 0;
/* find all event processors ep reachable from x (i.e. downstream from x).
* report each such ep exactly once
*/
static std::vector<ref::rp<AbstractEventProcessor>> map_network(ref::rp<AbstractEventProcessor> const & x);
/* visit direct downstream consumers c[i] of this event processor.
* call ep(c[i]) for each such consumer.
*/
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) = 0;
/* write representation to stream */
virtual void display(std::ostream & os) const = 0;
/* human-readable string identifying this source */
virtual std::string display_string() const;
}; /*AbstractEventProcessor*/
inline std::ostream &
operator<<(std::ostream & os, AbstractEventProcessor const & src) {
src.display(os);
return os;
} /*operator<<*/
} /*namespace reactor*/
} /*namespace xo*/
/* end AbstractEventProcessor.hpp */

View file

@ -0,0 +1,71 @@
/* @file AbstractSink.hpp */
#pragma once
#include "AbstractSource.hpp"
#include "xo/reflect/TaggedPtr.hpp"
#include "xo/reflect/TypeDescr.hpp"
//#include "time/Time.hpp"
#include "xo/indentlog/print/tag.hpp"
#include "xo/cxxutil/demangle.hpp"
#include <typeinfo>
namespace xo {
namespace reactor {
/* an event consumer.
* note that event representation is not specified,
* this helps avoid mandating a type hierarchy for events
*/
class AbstractSink : public virtual AbstractEventProcessor {
public:
using TypeDescr = reflect::TypeDescr;
using TaggedPtr = reflect::TaggedPtr;
public:
virtual ~AbstractSink() = default;
/* if true: sources may produce events of any reflected type.
* sink will accept such events using .notify_ev_tp()
* for example see web_util/WebsocketSink
*
* if false (common): souce is expected to to produce events of
* a single type, specified by .sink_ev_type()
* .notify_ev_tp() will downcast to that type.
* for example see reactor/Sink1
*
* polymorphic sinks pay for runtime polymorphism
* (since WebsocketSink sends events in json format this is
* expected to be negligible compared to message formatting)
*/
virtual bool allow_polymorphic_source() const = 0;
/* identify datatype for items expected by this sink */
virtual TypeDescr sink_ev_type() const = 0;
/* true iff this sink accepts volatile events.
* volatile events are events that may be modified
* or destroyed after being delivered to this sink.
*
* For example KalmanFilterSvc accepts volatile events,
* but EventStore requires non-volatile events.
*/
virtual bool allow_volatile_source() const = 0;
/* counts lifetime #of incoming events for this sink */
virtual uint32_t n_in_ev() const = 0;
/* attach an input source.
* typically this means calling src.add_callback()
* with a function thats calls a .notify_xxx() method
* on this Sink
*/
virtual void attach_source(ref::rp<AbstractSource> const & src) = 0;
/* accept incoming event, given by tagged pointer */
virtual void notify_ev_tp(TaggedPtr const & ev_tp) = 0;
}; /*AbstractSink*/
} /*namespace reactor*/
} /*namespace xo*/
/* end AbstractSink.hpp */

View file

@ -0,0 +1,94 @@
/* @file AbstractSource.hpp */
#pragma once
#include "AbstractEventProcessor.hpp"
#include "xo/reflect/TypeDescr.hpp"
#include "xo/callback/CallbackSet.hpp"
#include "xo/refcnt/Refcounted.hpp"
#include <string>
namespace xo {
namespace web { class StreamEndpointDescr; }
namespace reactor {
class AbstractSink;
template<typename T>
class Sink1;
/* abstract api for a source of events.
* Event representation is left open: Sources and Sinks
* need to have compatible event representations,
* and coordination is left to such (Source, Sink) pairs.
*
* See ReactorSource, for example
*
* Typically a Source will have one or more .add_callback()
* methods, for listening to source events
*/
class AbstractSource : public virtual AbstractEventProcessor {
public:
using StreamEndpointDescr = web::StreamEndpointDescr;
using TypeDescr = reflect::TypeDescr;
using CallbackId = fn::CallbackId;
public:
/* identify datatype for items delivered by this source */
virtual TypeDescr source_ev_type() const = 0;
/* if true: event objects (see .source_ev_type())
* maybe overwritten between callbacks.
* A sink that wants to capture events
* (e.g. EventStore<>) will need to deep-copy them
* if false: event objects are preserved between callbacks.
*/
virtual bool is_volatile() const = 0;
/* counts #of outbound events ready for delivery,
* but not yet sent */
virtual uint32_t n_queued_out_ev() const = 0;
/* counts lifetime #of events delivered.
* see also AbstractSink.n_in_ev
*/
virtual uint32_t n_out_ev() const = 0;
/* if true, simulator will report interaction with this source */
virtual bool debug_sim_flag() const = 0;
/* set .trace_sim_flag */
virtual void set_debug_sim_flag(bool x) = 0;
virtual CallbackId attach_sink(ref::rp<AbstractSink> const & sink) = 0;
virtual void detach_sink(CallbackId id) = 0;
/* endpoint for a websocket subscriber;
* subscriber delivers events produced by this source
*/
StreamEndpointDescr stream_endpoint_descr(std::string const & url_prefix);
/* typically expect events to be delivered using a reactor or simulator.
* (for example see reactor/Reactor, simulator/Simulator);
* reactor allocates cpu, and controls event ordering across sources
* when there are multiple sources.
*
* However, also possible for user code to invoke .deliver_one() directly.
* Beware, may get unpredictable results if attempt to do this on a source
* that's also attached to a reactor.
*/
virtual std::uint64_t deliver_one() = 0;
/* convenience: call .deliver_one() n times, return sum of results */
std::uint64_t deliver_n(uint64_t n);
/* convenience: call .deliver_one() until it returns 0
* (beware of inexhaustible sources!)
*/
std::uint64_t deliver_all();
}; /*AbstractSource*/
using AbstractSourcePtr = ref::rp<AbstractSource>;
} /*namespace reactor*/
} /*namespace xo*/
/* end AbstractSource.hpp */

View file

@ -0,0 +1,19 @@
/* @file DirectSource.hpp */
#pragma once
#include "time/Time.hpp"
#include "reactor/Sink.hpp"
#include "reactor/EventSource.hpp"
#include "reactor/HeapReducer.hpp"
//#include "reactor/LastReducer.hpp"
#include "reactor/Reactor.hpp"
#include "callback/CallbackSet.hpp"
namespace xo {
namespace reactor {
} /*namespace reactor*/
} /*namespace xo*/
/* end DirectSource.hpp */

View file

@ -0,0 +1,25 @@
/* @file DirectSourcePtr.hpp */
#pragma once
#include "reactor/SecondarySource.hpp"
#include "reactor/LastReducer.hpp"
#include "reactor/EventTimeFn.hpp"
namespace xo {
namespace reactor {
template<typename Event>
using DirectSource = SecondarySource<Event,
LastReducer<Event,
StructEventTimeFn<Event>>>;
/* use when Event is ref::rp<T> for some T */
template<typename Event>
using DirectSourcePtr = SecondarySource<Event,
LastReducer<Event,
PtrEventTimeFn<Event>>>;
} /*namespace reactor*/
} /*namespace xo*/
/* end DirectSourcePtr.hpp */

View file

@ -0,0 +1,25 @@
/* @file EventSource.hpp */
#pragma once
#include "reactor/ReactorSource.hpp"
#include "callback/CallbackSet.hpp"
namespace xo {
namespace reactor {
template</*typename Event,*/
typename Callback
/*void (Callback::*member_fn)(Event const &)*/>
class EventSource : public ReactorSource {
public:
using CallbackId = fn::CallbackId;
public:
virtual CallbackId add_callback(ref::rp<Callback> const & cb) = 0;
virtual void remove_callback(CallbackId id) = 0;
}; /*EventSource*/
} /*namespace reactor*/
} /*namespace xo*/
/* end EventSource.hpp */

View file

@ -0,0 +1,317 @@
/* @file EventStore.hpp */
#pragma once
#include "reactor/Reducer.hpp"
#include "reactor/EventTimeFn.hpp"
#include "reactor/Sink.hpp"
#include "web_util/HttpEndpointDescr.hpp"
#include "printjson/PrintJson.hpp"
#include "reflect/Reflect.hpp"
#include "tree/RedBlackTree.hpp"
namespace xo {
namespace reactor {
/* abstract event store api */
class AbstractEventStore : virtual public ref::Refcount {
public:
using PrintJson = xo::json::PrintJson;
using TaggedPtr = xo::reflect::TaggedPtr;
using HttpEndpointDescr = xo::web::HttpEndpointDescr;
using Alist = xo::web::Alist;
public:
/* true iff .size() == 0 */
virtual bool empty() const = 0;
/* #of events currently held in this store */
virtual std::uint32_t size() const = 0;
/* TODO:
* 1. TaggedGptr = discriminated union of
* a. TaggedRcptr (i.e. refcounted semantics)
* b. Unique (i.e. unique_ptr semantics)
* c. Exptr (i.e. unowned_ptr semantics)
* d. compact (special-case -- value fits in pointer)
* will need mpl copy/assign stuff for TaggedUnique
* 2. provide .last_n(), .last_dt()
*/
virtual void http_snapshot(ref::rp<PrintJson> const & pjson,
std::ostream * p_os) const = 0;
/* http endpoint; generates http output for this eventstore */
virtual HttpEndpointDescr http_endpoint_descr(ref::rp<PrintJson> const & pjson,
std::string const & url_prefix) const {
/* important that lambda contains its own rp<PrintJson>;
* reference to stack will not do
*/
ref::rp<PrintJson> pjson_rp = pjson;
auto http_fn = ([this, pjson_rp]
(std::string const & /*uri*/,
Alist const & /*alist*/,
std::ostream * p_os)
{
/* WARNING: race condition here,
* given webserver runs from a separate thread
*/
this->http_snapshot(pjson_rp, p_os);
});
return HttpEndpointDescr(url_prefix + "/snap", http_fn);
} /*http_endpoint_descr*/
virtual void clear() = 0;
virtual void insert_tp(TaggedPtr const & ev_tp) = 0;
}; /*AbstractEventStore*/
/* in-memory storage for a set of events.
*
* Require:
* - Event is null-constructible
* - Event is copyable
* - EventTimeFn :: Event -> utc_nanos
*
* inheritance
* ref::Refcount
* ^
* isa
* |
* reactor::AbstractEventProcessor + req .visit_direct_consumers()
* ^
* isa
* |
* reactor::AbstractSink + req .sink_ev_type(), .notify_ev() etc.
* ^
* isa
* |
* reactor::Sink1<Event> + .attach_source(), .sink_ev_type(),
* ^ req .notify_ev() etc
* |
* isa
* |
* reactor::SinkEndpoint<Event> + impl .visit_direct_consumers()
* ^
* isa
* |
* reactor::StructEventStore<Event, ..> + .last_n() .last_dt() etc.
*/
template<typename Event,
typename EventTimeFn>
class EventStoreImpl : public SinkEndpoint<Event>,
public AbstractEventStore,
ReducerBase<Event, EventTimeFn>
{
static_assert(EventTimeConcept<Event, EventTimeFn>);
public:
using utc_nanos = xo::time::utc_nanos;
using nanos = xo::time::nanos;
using EventTree = xo::tree::RedBlackTree<utc_nanos, Event,
xo::tree::OrdinalReduce<Event>>;
using PrintJson = xo::json::PrintJson;
using Alist = xo::web::Alist;
using HttpEndpointDescr = xo::web::HttpEndpointDescr;
static ref::rp<EventStoreImpl> make() { return new EventStoreImpl(); }
/* visit most recent n events in this store.
* returns #of events actually visited
*
* if events visited are e1 .. en, then:
* (1) en is the most recent recorded event
* (.event_tm(en) is .tree.max_key())
* (2) there are no events between e(i) and e(i+1)
* (i.e. visit does not skip over any events)
* (3) if v < n, then v = .size(),
* where v is the #of events visited
*
* require:
* - Fn :: (Event -> )
*/
template <typename Fn>
std::uint32_t visit_last_n(std::uint32_t n, Fn && fn) const {
std::uint32_t z = this->size();
std::uint32_t lo = ((n >= z) ? 0 : z - n);
typename EventTree::const_iterator lo_ix = this->tree_.find_ith(lo);
typename EventTree::const_iterator hi_ix = this->tree_.cend();
return this->visit_range(lo_ix, hi_ix, fn);
} /*visit_last_n*/
/* visit suffix of events sufficient to cover interval of length dt.
* visit events in increasing timestamp order.
*
* if events visited are e1 .. en, then:
* (1) en is the most recent recorded event
* (.event_tm(en) is .tree.max_key())
* (2) there are no events between e(i) and e(i+1)
* (i.e. visit does not skip over any events)
* (3) if .event_tm(en) - .event_tm(e1) < dt,
* then e1 is the earliest recorded event
* (.event_tm(e1) is .tree.min_key())
* (4) if .event_tm(en) - .event_tm(e1) > dt,
* then (.event_tm(en) - .event_tm(e2)) < dt
*
* |<---------- dt ----------->|
* ^ ^ ^
* e1 e2 en
*/
template <typename Fn>
std::uint32_t visit_last_dt(nanos dt, Fn && fn) const {
if (tree_.empty())
return 0;
/* tree not empty -> has max key */
utc_nanos tn = this->tree_.max_key();
utc_nanos tk = tn - dt;
typename EventTree::const_iterator lo_ix = this->tree_.find_glb(tk, true /*closed*/);
typename EventTree::const_iterator hi_ix = this->tree_.end();
return this->visit_range(lo_ix, hi_ix, fn);
} /*visit_last_dt*/
std::vector<Event> last_n(std::uint32_t n) const {
std::vector<Event> retval;
auto fn = [&retval](Event const &ev) { retval.push_back(ev); };
this->visit_last_n(n, fn);
return retval;
} /*last_n*/
std::vector<Event> last_dt(nanos dt) const {
std::vector<Event> retval;
auto fn = [&retval](Event const &ev) { retval.push_back(ev); };
this->visit_last_dt(dt, fn);
return retval;
} /*last_dt*/
void insert(Event const & ev) { this->tree_.insert(typename EventTree::value_type(this->event_tm(ev), ev)); }
// ----- Inherited from AbstractEventStore -----
virtual bool empty() const override { return tree_.empty(); }
virtual std::uint32_t size() const override { return tree_.size(); }
/* write http snapshot of current state to *p_os */
virtual void http_snapshot(ref::rp<PrintJson> const & pjson, std::ostream * p_os) const override {
using xo::reflect::Reflect;
/* visit last 100 events;
* write them to *p_os in increasing time order
*/
auto ev_v = this->last_n(100);
pjson->print_tp(Reflect::make_tp(&ev_v), p_os);
} /*http_snapshot*/
virtual void clear() override { this->tree_.clear(); }
virtual void insert_tp(TaggedPtr const & ev_tp) override {
using xo::xtag;
Event * p_ev = ev_tp.recover_native<Event>();
if (p_ev) {
this->insert(*p_ev);
} else {
throw std::runtime_error(tostr("StructEventStore<Event>::insert_tp"
": unable to convert ev_tp to Event",
xtag("ev_tp.type", ev_tp.td()->canonical_name()),
xtag("Event", reflect::type_name<Event>())));
}
} /*insert_tp*/
// ----- Inherited from AbstractSink -----
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
virtual bool allow_volatile_source() const override { return false; }
virtual void notify_ev(Event const & ev) override {
++(this->n_in_ev_);
this->insert(ev);
}
// ----- Inherited from AbstractSource -----
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<EventStoreImpl"
<< xtag("name", this->name())
<< xtag("n_in_ev", this->n_in_ev())
<< ">";
} /*display*/
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { name_ = x; }
private:
EventStoreImpl() = default;
template <typename Fn>
std::uint32_t visit_range(typename EventTree::const_iterator lo_ix,
typename EventTree::const_iterator hi_ix,
Fn && fn) const {
std::uint32_t n = 0;
for (; lo_ix != hi_ix; ++lo_ix, ++n) {
fn(lo_ix->second);
}
return n;
} /*visit_range*/
private:
/* reporting name for this store */
std::string name_;
/* fetches per-event timestamp */
EventTimeFn event_tm_fn_;
/* counts lifetime #of incoming events (see .notify_ev()) */
uint32_t n_in_ev_ = 0;
/* events stored here */
EventTree tree_;
}; /*EventStoreImpl*/
template<typename Event>
using StructEventStore = EventStoreImpl<Event, StructEventTimeFn<Event>>;
template<typename Event>
using PtrEventStore = EventStoreImpl<Event, PtrEventTimeFn<Event>>;
/* Require:
* EventTimeConcept<T, StructEventTimeFn<T>>
*/
template <typename T>
class SinkToEventStore : public SinkEndpoint<T> {
public:
using EventStore = StructEventStore<T>;
public:
SinkToEventStore() = default;
virtual void notify_ev(T const & ev) override {
store_.insert(ev);
} /*notify_ev*/
private:
/* stash remembered events (all of them!) here */
EventStore store_;
}; /*SinkToEventStore*/
} /*namespace reactor*/
} /*namespace xo*/
/* end EventStore.hpp */

View file

@ -0,0 +1,38 @@
/* @file EventTimeFn.hpp */
#pragma once
#include "time/Time.hpp"
#include <concepts>
namespace xo {
namespace reactor {
template <typename Event, typename EventTimeFn>
concept EventTimeConcept = requires(EventTimeFn etfn, Event ev) {
{ etfn(ev) } -> std::same_as<xo::time::utc_nanos>;
};
template<typename Event>
class StructEventTimeFn {
public:
using event_t = Event;
using utc_nanos = xo::time::utc_nanos;
public:
utc_nanos operator()(Event const & ev) const { return ev.tm(); }
}; /*StructEventTimeFn*/
template<typename Event>
class PtrEventTimeFn {
public:
using event_t = Event;
using utc_nanos = xo::time::utc_nanos;
public:
utc_nanos operator()(Event const & ev) const { return ev->tm(); }
}; /*PtrEventTimeFn*/
} /*namespace reactor*/
} /*namespace xo*/
/* end EventTimeFn.hpp */

View file

@ -0,0 +1,72 @@
/* @file HeapReducer.hpp */
#pragma once
#include "reactor/Reducer.hpp"
namespace xo {
namespace reactor {
/* collect incoming events in a heap,
* ordered by timestamp.
* output events in increasing timestamp order.
* Information preserving in all other respects
*
* Require:
* - Event is null-constructible
* - Event is copyable
* - EventTimeFn :: Event -> utc_nanos
*/
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
class HeapReducer : public ReducerBase<Event, EventTimeFn> {
public:
using utc_nanos = xo::time::utc_nanos;
public:
HeapReducer() = default;
HeapReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
bool is_empty() const { return this->event_heap_.empty(); }
/* require: .is_empty() = false */
utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); }
/* #of events stored in this reducer */
uint32_t n_event() const { return this->event_heap_.size(); }
Event const & last_annexed_ev() const { return this->annexed_ev_; }
void include_event(Event const & ev) {
this->event_heap_.push_back(ev);
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
void include_event(Event && ev) {
this->event_heap_.push_back(std::move(ev));
std::push_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
} /*include_event*/
Event & annex_one() {
this->annexed_ev_ = this->event_heap_.front();
std::pop_heap(this->event_heap_.begin(),
this->event_heap_.end(),
std::greater<Event>());
this->event_heap_.pop_back();
return this->annexed_ev_;
} /*annex_one*/
// ----- Inherited from ReducerBase -----
// utc_nanos event_tm(Event const & x);
private:
/* queued Events, in increasing timestamp order */
std::vector<Event> event_heap_;
/* annexed event, removed from .event_heap */
Event annexed_ev_;
}; /*HeapReducer*/
} /*namespace reactor*/
} /*namespace xo*/
/* end HeapReducer.hpp */

View file

@ -0,0 +1,154 @@
/* @file LastReducer.hpp */
#pragma once
#include "reactor/Reducer.hpp"
#include <array>
namespace xo {
namespace reactor {
/* implementation record used in LastReducer.
* LastReducer (see below) remembers a single event,
* + will be updated on successive calls to
* LastReducer.include_event()
*
* need to remember the _first_ (& therefore earliest)
* event timestamp in such a wave, since that establishes when simulator
* should deliver the event -- even if event is subsequently
* overwritten.
*
* once event is delivered, timestamp can reset
*
* otherwise if upstream producer sends events with
* future timestamps, can get indefinite postponement
* with simulation clock failing to catch up to event time.
*
*/
template<typename Event>
class EventRecd {
public:
using utc_nanos = xo::time::utc_nanos;
public:
EventRecd() = default;
EventRecd(utc_nanos tm, Event ev) : trigger_tm_{tm}, ev_{ev} {}
EventRecd(utc_nanos tm, Event && ev) : trigger_tm_{tm}, ev_{std::move(ev)} {}
public:
/* if sim, deliver event when simulation clock reaches
* .trigger_tm; .trigger_tm can be earlier than .ev time
*/
utc_nanos trigger_tm_;
/* event to deliver */
Event ev_;
};
/* reducer that just remembers the last event
*
* Require:
* - Event is null-contructible
* - Event is copyable
*
* LastReducer provides reentrancy support. This support doesn't operate
* if Event copy is not deep, e.g. for Event = rp<Foo>n
*
* .include_event()
* /-------\ -----------------> /------\
* | empty | | full |
* \-------/ <----------------- \------/
* . .annex_one() .
* . .
* .is_empty()=true .is_empty()=false
*/
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
class LastReducer : public ReducerBase<Event, EventTimeFn> {
public:
using utc_nanos = xo::time::utc_nanos;
public:
LastReducer() = default;
LastReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
bool is_empty() const { return empty_flag_; }
/* require: .is_empty() = false */
utc_nanos next_tm() const {
return this->last_ev_[this->last_ix_].trigger_tm_;
//return this->event_tm(this->last_ev_[this->last_ix_]);
}
/* #of events stored in this reducer (0 or 1) */
uint32_t n_event() const { return this->empty_flag_ ? 0 : 1; }
Event const & last_annexed_ev() const {
return this->last_ev_[1 - this->last_ix_].ev_;
}
EventRecd<Event> & include_event_aux(Event const & ev) {
EventRecd<Event> & evr
= this->last_ev_[this->last_ix_];
if (this->empty_flag_) {
/* evr.trigger_tm will be preserved across
* successive calls to .include_event();
* until .annex_one()
*/
evr.trigger_tm_ = this->event_tm(ev);
this->empty_flag_ = false;
}
return evr;
} /*include_event_aux*/
void include_event(Event const & ev) {
EventRecd<Event> & evr
= this->include_event_aux(ev);
evr.ev_ = ev;
} /*include_event*/
void include_event(Event && ev) {
EventRecd<Event> & evr
= this->include_event_aux(ev);
evr.ev_ = std::move(ev);
} /*include_event*/
Event & annex_one() {
std::uint32_t annexed_ix = this->last_ix_;
/* since .empty_flag is true,
* next call to .include_event_aux() will
* capture new timestamp
*/
this->empty_flag_ = true;
this->last_ix_ = (1 - this->last_ix_);
return this->last_ev_[annexed_ix].ev_;
} /*annex_one*/
// ----- Inherited from ReducerBase -----
//utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); }
private:
/* true when reducer contains 0 queued events,
* not counting any annexed event
*/
bool empty_flag_ = true;
/* .last_ev[.last_ix] updated by .include_event()
*/
std::uint32_t last_ix_ = 0;
/* remember two events
* (a) a single queued event (updated by .include_event())
* (b) a single removed event (reported by .annex_one())
*
* roles of .last_ev[0], .last_ev[1] reverse each time .annex_one() runs
*/
std::array<EventRecd<Event>, 2> last_ev_;
}; /*LastReducer*/
} /*namespace reactor*/
} /*namespace xo*/
/* end LastReducer.hpp */

View file

@ -0,0 +1,44 @@
/* @file PollingReactor.hpp */
#pragma once
#include "Reactor.hpp"
#include "ReactorSource.hpp"
#include <vector>
#include <cstdint>
namespace xo {
namespace reactor {
/* reactor that runs by polling an ordered set of sources */
class PollingReactor : public Reactor {
public:
PollingReactor() = default;
// ----- inherited from Reactor -----
virtual bool add_source(ref::brw<ReactorSource> src) override;
virtual bool remove_source(ref::brw<ReactorSource> src) override;
virtual std::uint64_t run_one() override;
private:
/* find non-empty source, starting from .source_v_[start_ix],
* wrapping around to .source_v_[start_ix - 1].
*
* return index of first available non-empty source,
* or -1 if all sources are empty
*/
std::int64_t find_nonempty_source(std::size_t start_ix);
private:
/* next source to poll will be .source_v_[.next_ix_] */
std::size_t next_ix_ = 0;
/* ordered set of sources (see reactor::Source)
* reactor will poll sources in round-robin order
*/
std::vector<ReactorSourcePtr> source_v_;
}; /*PollingReactor*/
} /*namespace reactor*/
} /*namespace xo*/
/* end PollingReactor.hpp */

View file

@ -0,0 +1,92 @@
/* file PolyAdapterSink.hpp
*
* author: Roland Conybeare, Sep 2022
*/
#pragma once
#include "Sink.hpp"
#include "xo/reflect/Reflect.hpp"
namespace xo {
namespace reactor {
/* adapter between a source that delivers a particular event type T,
* and a sink that accepts arbitrarily-typed events via .notify_ev_tp()
* Use this to connect to a polymorphic sink.
*
* Require:
* - .poly_sink.allow_polymorphic_source()
* (ofc. otherwise no point in using PolyAdapterSink<T>)
* - .poly_sink.allow_volatile_source()
* need this bc will be wrapping event with TaggedPtr,
* which doesn't manage event lifetime
*/
template<typename T>
class PolyAdapterSink : public reactor::Sink1<T> {
public:
using Reflect = reflect::Reflect;
using TaggedPtr = reflect::TaggedPtr;
public:
/* named ctor idiom */
static ref::rp<PolyAdapterSink> make(ref::rp<AbstractSink> poly_sink) {
//xo::scope lscope("PolyAdapterSink::make");
ref::rp<PolyAdapterSink> retval(new PolyAdapterSink(poly_sink));
//lscope.log("adapter", (void*)retval.get());
return retval;
} /*make*/
// ----- Inherited from Sink1<T> -----
virtual void notify_ev(T const & ev) override {
//xo::scope lscope("PolyAdapterSink::notify_ev");
//lscope.log(xo::xtag("ev", ev));
TaggedPtr ev_tp = Reflect::make_tp(const_cast<T *>(&ev));
this->notify_ev_tp(ev_tp);
} /*notify_ev*/
// ----- Inherited from AbstractSink -----
virtual bool allow_volatile_source() const override { return true; }
virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); }
/* note: ok to do this, however if expecting to use this entry point,
* maybe don't need to interpose PolyAdapterSink<T> ahead of .poly_sink
*/
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
//xo::scope lscope("PolyAdapterSink::notify_ev_tp");
return this->poly_sink_->notify_ev_tp(ev_tp);
}
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return this->poly_sink_->name(); }
virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
this->poly_sink_->visit_direct_consumers(fn);
}
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<PolyAdapterSink"
<< xtag("addr", (void*)this)
<< xtag("T", reflect::type_name<T>())
<< xtag("poly", this->poly_sink_)
<< ">";
} /*display*/
private:
PolyAdapterSink(ref::rp<AbstractSink> poly_sink) : poly_sink_{std::move(poly_sink)} {}
private:
/* mandate: .poly_sink.allow_polymorphic_source() is true */
ref::rp<AbstractSink> poly_sink_;
}; /*PolyAdapterSink*/
} /*namespace reactor*/
} /*namespace xo*/
/* end PolyAdapterSink.hpp */

View file

@ -0,0 +1,63 @@
/* @file Reactor.hpp */
#pragma once
#include "xo/refcnt/Refcounted.hpp"
#include <cstdint>
namespace xo {
namespace reactor {
class ReactorSource;
/* abtract api for a reactor:
* something that arranges to have work done on a set of Sources.
*/
class Reactor : public ref::Refcount {
public:
virtual ~Reactor() = default;
/* add source src to this reactor.
* on success, invoke src.notify_reactor_add(this)
*
* returns true if source added; false if already present
*/
virtual bool add_source(ref::brw<ReactorSource> src) = 0;
/* remove source src from this reactor.
* source must previously have been added by
* .add_source(src).
*
* on success, invoke src.notify_reactor_remove(this)
*
* returns true if source removed; false if not present
*/
virtual bool remove_source(ref::brw<ReactorSource> src) = 0;
/* notification when non-primed source (source with no known events)
* becomes primed (source with at least one event)
*/
virtual void notify_source_primed(ref::brw<ReactorSource> src) = 0;
/* dispatch one reactor event, borrowing the calling thread
* amount of work this represents is Source/Sink specific.
*
* returns #of events dispatched (0 or 1)
*/
virtual std::uint64_t run_one() = 0;
/* borrow calling thread to dispatch reactor events.
* if n is -1, run indefinitely
* otherwise dispatch up to n events.
* n = 0 is a noop
*/
void run_n(int32_t n);
/* borrow calling thread to run indefinitely.
* suitable implementation for dedicated reactor threads
*/
void run() { this->run_n(-1); }
}; /*Reactor*/
} /*namespace reactor*/
} /*namespace xo*/
/* end Reactor.hpp */

View file

@ -0,0 +1,130 @@
/* @file ReactorSource.hpp */
#pragma once
#include "AbstractSource.hpp"
//#include "time/Time.hpp"
#include <cstdint>
namespace xo {
namespace reactor {
class Reactor;
/* abstract api for a source of events.
* Event representation is left open: Sources and Sinks
* need to have compatible event representations,
* and coordination is left to such (Source, Sink) pairs.
*
* Source->Sink activity may be expected to be mediated by a reactor,
* that implements the Reactor api.
*
* At any time, A Source can be associated with at most one reactor.
* Sources are informed of Reactor<->Source association being
* formed/broken by the
* .notify_reactor_add(), .notify_reactor_remove()
* methods
*
* The source api intends also to provide for simulation.
* There introduces two simulation-specific methods:
* .sim_current_tm()
* .sim_advance_until()
*
* A non-simulation source can implement these as calls to
* .online_current_tm(), .online_advance_until() respectively
* .online_current_tm() aborts since an online source is never exhausted
* .online_advance_until() is a no-op that returns 0
*
* Loop for consuming from a primary simulation source:
*
* brw<Source> s = ...;
* while(!s->is_exhausted())
* s->deliver_one();
*
* Secondary sources (sources that depend on other sources) can be
* in a state where they don't know their next event, in which case:
*
* s->is_notprimed() == true
*/
class ReactorSource : public AbstractSource {
public:
using utc_nanos = xo::time::utc_nanos;
public:
virtual ~ReactorSource() = default;
/* true if source is currently empty (has 0 events to deliver) */
virtual bool is_empty() const = 0;
bool is_nonempty() const { return !this->is_empty(); }
/* true when source knows its next event
* A source that isn't primed is also excluded from simulation
* heap until it becomes primed.
* This make feasible simulation sources that
* depend on other simulation sources
*/
virtual bool is_primed() const { return !this->is_empty(); }
virtual bool is_notprimed() const { return this->is_empty(); }
/* if true, this source has no events, and will never publish more events
* - for sim, return true for a standalone source that has replayed all events
* - for rt, set during orderly
*/
virtual bool is_exhausted() const = 0;
/* if this is a simulation source and .is_exhausted is false:
* returns next event time; more precisely, no events exist prior to
* this time.
*
* if sim, and .is_primed = true,
* returns timestamp of next event
*/
virtual utc_nanos sim_current_tm() const = 0;
/* promise:
* - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true
* - if replay_flag is true, then any events between previous .current_tm()
* and new .current_tm() will have been published
*
* returns #of events delivered.
* does not count events that were skipped, so always returns 0 if
* replay_flag is false
*/
virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0;
/* informs source when it's added to a reactor
* (see Reactor.add_source())
*/
virtual void notify_reactor_add(Reactor * /*reactor*/) {}
/* informs source when it's removed from a reactor
* (see Reactor.remove_source())
*/
virtual void notify_reactor_remove(Reactor * /*reactor*/) {}
// ----- Inherited from AbstractSource -----
/* deliver one event to attached sink
* interpretation of 'one event' is source-specific;
* could be a collapsed or batched event in practice.
*
* no-op if source is empty.
*
* if sim, promise:
* - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted()
*
* returns #of events delivered. Must be 0 or 1 in this context
*/
virtual std::uint64_t deliver_one() override = 0;
protected:
/* default implementations for online sources */
utc_nanos online_current_tm() const;
uint64_t online_advance_until(utc_nanos tm, bool replay_flag);
}; /*ReactorSource*/
using ReactorSourcePtr = ref::rp<ReactorSource>;
} /*namespace reactor*/
} /*namespace xo*/
/* end ReactorSource.hpp */

View file

@ -0,0 +1,33 @@
/* @file Reducer.hpp */
#pragma once
#include "reactor/EventTimeFn.hpp"
namespace xo {
namespace reactor {
/* LastReducer, HeapReducer inherit ReducerBase */
template<typename Event, typename EventTimeFn>
class ReducerBase {
static_assert(EventTimeConcept<Event, EventTimeFn>);
public:
using utc_nanos = xo::time::utc_nanos;
public:
ReducerBase() = default;
ReducerBase(EventTimeFn const & evtfn) : event_tm_fn_{evtfn} {}
utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); }
private:
/* Event ev = ...;
* .event_tm_fn(ev) -> utc_nanos
* reports event time associated with ev
*/
EventTimeFn event_tm_fn_;
}; /*ReducerBase*/
} /*namespace reactor*/
} /*namespace xo*/
/* end Reducer.hpp */

View file

@ -0,0 +1,359 @@
/* @file SecondarySource.hpp */
#pragma once
#include "time/Time.hpp"
#include "reactor/Sink.hpp"
#include "reactor/DirectSource.hpp"
#include "reactor/Reactor.hpp"
#include "callback/CallbackSet.hpp"
#include "reflect/demangle.hpp"
#include <vector>
namespace xo {
namespace reactor {
/* A passive event source.
* Can use as backend publisher when implementating another
* event processor.
*/
template<typename Event, typename Reducer = HeapReducer<Event>>
class SecondarySource : public EventSource<Sink1<Event>> {
public:
using EventSink = Sink1<Event>;
template<typename Fn>
using RpCallbackSet = fn::RpCallbackSet<Fn>;
using CallbackId = fn::CallbackId;
using TypeDescr = xo::reflect::TypeDescr;
using utc_nanos = xo::time::utc_nanos;
public:
~SecondarySource() = default;
static ref::rp<SecondarySource> make() { return new SecondarySource(); }
/* last event delivered from this source --
* i.e. event in most recent call to .deliver_one_aux()
*/
Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); }
void notify_upstream_exhausted() { this->upstream_exhausted_ = true; }
/* make event available to reactor, by adding to internal reducer */
void notify_secondary_event(Event const & ev) {
/* test if ev is priming, update .current_tm */
bool is_priming = this->preprocess_secondary_event(ev);
this->reducer_.include_event(ev);
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
void notify_secondary_event(Event && ev) {
bool is_priming = this->preprocess_secondary_event(ev);
this->reducer_.include_event(ev);
this->postprocess_secondary_event(is_priming);
} /*notify_secondary_event*/
template<typename T>
void notify_secondary_event_v(T const & v) {
using xo::scope;
using xo::xtag;
if (v.empty())
return;
scope log(XO_DEBUG(this->debug_sim_flag_));
log && log(xtag("name", this->name()));
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event_v"
": not allowed after upstream exhausted");
}
uint32_t n_ev = 0;
for (Event const & ev : v) {
utc_nanos evtm = this->reducer_.event_tm(ev);
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
++n_ev;
}
log && log(xtag("T", reflect::type_name<T>()),
xtag("n_ev", n_ev));
if (n_ev > 0) {
/* if reducer is empty when .notify_secondary_event_v() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
for (Event const & ev : v)
this->reducer_.include_event(ev);
Reactor * reactor = this->parent_reactor_;
if (reactor) {
if (is_priming) {
/* reactor/simulator takes responsibility for delivering events */
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
}
} else {
/* special case if no reactor: deliver immediately */
//this->deliver_one();
this->deliver_all();
}
}
} /*notify_secondary_event_v*/
// ----- inherited from EventSource -----
CallbackId add_callback(ref::rp<EventSink> const & cb) override {
return this->cb_set_.add_callback(cb);
} /*add_callback*/
void remove_callback(CallbackId id) override {
this->cb_set_.remove_callback(id);
} /*remove_callback*/
// ----- inherited from ReactorSource -----
virtual bool is_empty() const override { return this->reducer_.is_empty(); }
virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); }
virtual utc_nanos sim_current_tm() const override {
using xo::scope;
using xo::xtag;
if (this->reducer_.is_empty()) {
/* this is a tricky case.
* it means this source doesn't
* _know_ specific next event yet; however new events
* may appear at any time by way of .notify_event()
*
* If event doesn't know next event, then .current_tm isn't useful
* for establishing priority relative to other sources.
* rely on priming mechanism instead,
* which means that control should never come here.
*/
return this->current_tm_;
} else {
scope log(XO_DEBUG(false /*this->debug_sim_flag_*/),
xtag("name", this->name_),
xtag("next_tm", this->reducer_.next_tm()));
return this->reducer_.next_tm();
}
} /*sim_current_tm*/
virtual std::uint64_t deliver_one() override {
return this->deliver_one_aux(true /*replay_flag*/);
}
virtual std::uint64_t sim_advance_until(utc_nanos target_tm,
bool replay_flag) override
{
uint64_t retval = 0;
while (!this->reducer_.is_empty()) {
utc_nanos tm = this->sim_current_tm();
if (tm < target_tm) {
retval += this->deliver_one_aux(replay_flag);
} else {
break;
}
}
return retval;
} /*sim_advance_until*/
virtual void notify_reactor_add(Reactor * reactor) override {
assert(!this->parent_reactor_);
this->parent_reactor_ = reactor;
} /*notify_reactor_add*/
virtual void notify_reactor_remove(Reactor * /*reactor*/) override {}
// ----- inherited from AbstractSource -----
virtual TypeDescr source_ev_type() const override {
return reflect::Reflect::require<Event>();
} /*source_ev_type*/
virtual uint32_t n_out_ev() const override { return n_out_ev_; }
/* #of events queued for delivery */
virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); }
virtual bool debug_sim_flag() const override { return debug_sim_flag_; }
virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; }
virtual CallbackId attach_sink(ref::rp<AbstractSink> const & sink) override {
ref::rp<EventSink> native_sink
= EventSink::require_native("SecondarySource::attach_sink", sink);
if (native_sink) {
if (!this->is_volatile()
|| native_sink->allow_volatile_source())
{
return this->add_callback(native_sink);
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": sink requires non-volatile source "
+ std::string(reflect::type_name<Event>()));
}
} else {
throw std::runtime_error("SecondarySource::attach_sink"
": expected sink accepting "
+ std::string(reflect::type_name<Event>()));
}
} /*attach_sink*/
virtual void detach_sink(CallbackId id) override {
this->remove_callback(id);
} /*detach_sink*/
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { this->name_ = x; }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
for(auto x : this->cb_set_)
fn(x.fn_.borrow());
} /*visit_direct_consumers*/
private:
/* event book-keeping on receiving an event.
*/
bool preprocess_secondary_event(Event const & ev)
{
if (this->upstream_exhausted_) {
throw std::runtime_error("SecondarySource::notify_secondary_event"
": not allowed after upstream exhausted");
}
utc_nanos evtm = this->reducer_.event_tm(ev);
if (this->current_tm_ < evtm)
this->current_tm_ = evtm;
/* if reducer is empty when .notify_event() begins,
* then reactor/simulator needs to be notified that source is no longer empty
*/
bool is_priming = this->reducer_.is_empty();
return is_priming;
} /*preprocess_secondary_event*/
/* event bookkeeping after receiving an event.
*
* Require: event has been propagated to .reducer
*
* is_priming. true if event causes source to
* become non-empty --> must notify reactor
*/
void postprocess_secondary_event(bool is_priming) {
using xo::scope;
using xo::xtag;
Reactor * reactor = this->parent_reactor_;
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", name_),
xtag("reactor", (void*)reactor),
xtag("is_priming", is_priming));
if (reactor) {
if (is_priming) {
/* reactor/simulator takes responsibility for delivering events */
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
}
} else {
/* if no reactor, deliver immediately */
this->deliver_one();
}
} /*postprocess_secondary_event*/
/* deliver one event from reducer;
* invoke callback whenever replay_flag is true
*/
std::uint64_t deliver_one_aux(bool replay_flag) {
scope log(XO_DEBUG(this->debug_sim_flag_),
xtag("name", this->name_),
xtag("reducer.empty", this->reducer_.is_empty()),
xtag("replay_flag", replay_flag));
if (this->reducer_.is_empty())
return 0;
/* need to remove event _before_ invoking callbacks;
* callbacks may indirectly call this->notify_secondary_event(),
* modifiying .reducer
*
* reducer may use double-buffering scheme or similar to
* mitigate copying, esp when Event objects are heavy
*/
Event & ev = this->reducer_.annex_one();
/* if SecondarySource:
* Event ev = this->event_heap_.front();
* std::pop_heap(this->event_heap_.begin(),
* this->event_heap_.end(),
* std::greater<Event>());
* this->event_heap_.pop_back();
*/
if (replay_flag) {
++(this->n_out_ev_);
this->cb_set_.invoke(&EventSink::notify_ev, ev);
}
return 1;
} /*deliver_one_aux*/
private:
/* current time for this source */
utc_nanos current_tm_;
/* reporting name for this source (use when .debug_sim_flag set)
*/
std::string name_;
/* if true, reactor/simulator to log interaction with this source
*/
bool debug_sim_flag_ = false;
/* count lifetime #of outgoing events */
uint32_t n_out_ev_ = 0;
/* set this to true, once, to announce that upstream will send
* no more events. see .notify_upstream_exhausted()
*/
bool upstream_exhausted_ = false;
/* events to be delivered to callbacks.
* multiple events may be collapsed depending on Reducer implementation
*/
Reducer reducer_;
/* reactor/simulator being used to schedule consumption. if ommitted,
* will borrow thread calling .notify_secondary_event()
*/
Reactor * parent_reactor_ = nullptr;
/* invoke callbacks in this set to send an outgoing event */
RpCallbackSet<EventSink> cb_set_;
}; /*SecondarySource*/
} /*namespace reactor*/
} /*namespace xo*/
/* end SecondarySource.hpp */

222
include/xo/reactor/Sink.hpp Normal file
View file

@ -0,0 +1,222 @@
/* @file Sink.hpp */
#pragma once
#include "AbstractSink.hpp"
#include "AbstractSource.hpp"
#include "PolyAdapterSink.hpp"
#include "xo/reflect/Reflect.hpp"
#include "xo/indentlog/print/time.hpp"
#include "xo/indentlog/print/tag.hpp"
#include "xo/cxxutil/demangle.hpp"
#include <typeinfo>
namespace xo {
namespace reactor {
/* Sink for events of type T
*
* inheritance:
* ref::Refcount
* ^
* isa
* |
* reactor::AbstractEventProcessor
* ^
* isa
* |
* reactor::AbstractSink
* ^
* isa
* |
* reactor::Sink1<T>
*/
template<typename T>
class Sink1 : public AbstractSink {
public:
using Reflect = reflect::Reflect;
using TypeDescr = reflect::TypeDescr;
public:
/* convenience: convert abstract sink to Sink1<T>*,
* or throw
*/
static ref::rp<Sink1<T>> require_native(std::string_view caller,
ref::rp<AbstractSink> const & sink)
{
using xo::scope;
using xo::xtag;
/* 1. if sink expects events of type T,
* make direct connection
*/
Sink1<T> * native_sink = nullptr;
native_sink = dynamic_cast<Sink1<T> *>(sink.get());
if (native_sink)
return native_sink;
/* 2. if sink is polymorphic,
* make type-erasing adapter
*/
if (sink->allow_polymorphic_source()) {
#ifdef DEBUG_NOT_USING
scope lscope("Sink1<T>::require_native: create PolyAdapterSink");
lscope.log(xtag("caller", caller));
#endif
return PolyAdapterSink<T>::make(sink);
}
if (!native_sink) {
#ifdef DEBUG_EVENT_TYPEINFO
std::type_info const * sink_parent_typeinfo
= sink->parent_typeinfo();
#endif
std::size_t src_hashcode = typeid(T).hash_code();
throw std::runtime_error(tostr("Sink1<T>::require_native"
": wanted to sink S, but sink expects T",
xtag("caller", caller),
xtag("T", sink->sink_ev_type()->canonical_name()),
xtag("S", reflect::type_name<T>()),
xtag("required_hashcode", typeid(Sink1<T>).hash_code()),
xtag("required_name", typeid(Sink1<T>).name()),
xtag("src_hashcode", src_hashcode),
xtag("sink_hashcode", sink->sink_ev_type()->typeinfo()->hash_code())
#ifdef DEBUG_EVENT_TYPEINFO
, xtag("sink_hashcode", sink->item_typeinfo()->hash_code())
, xtag("sink_parent_hashcode", sink_parent_typeinfo->hash_code())
, xtag("sink_parent_name", sink_parent_typeinfo->name())
, xtag("sink.type", sink->self_typename())
, xtag("sink.parent_type", sink->parent_typename())
#endif
));
}
return native_sink;
} /*require_native*/
virtual TypeDescr sink_ev_type() const override { return reflect::Reflect::require<T>(); }
/* accept incoming event */
virtual void notify_ev(T const & ev) = 0;
/* invoke these when this sink added to, or removed from, a source */
virtual void notify_add_callback() {}
virtual void notify_remove_callback() {}
// ----- inherited from AbstractSink -----
/* Sink1<T> only allows source providing T */
virtual bool allow_polymorphic_source() const override { return false; }
virtual void attach_source(ref::rp<AbstractSource> const & src) override {
src->attach_sink(this);
} /*attach_source*/
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
using xo::xtag;
T * p_ev = ev_tp.recover_native<T>();
if (p_ev) {
this->notify_ev(*p_ev);
} else {
throw std::runtime_error(tostr("Sink1<T>::notify_ev_tp"
": unable to convert ev_tp to T",
xtag("ev_tp.type", ev_tp.td()->canonical_name()),
xtag("T", reflect::type_name<T>())));
}
} /*notify_ev_tp*/
}; /*Sink1*/
/* a sink with no further downstream processors */
template<typename T>
class SinkEndpoint : public Sink1<T> {
public:
// ----- Inherited from AbstractEventProcessor -----
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { name_ = x; }
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor>)> const &) override {
/* *this is not an event source */
} /*visit_direct_consumers*/
private:
/* reporting name for this sink */
std::string name_;
}; /*SinkEndpoint*/
template<typename T, typename Fn>
class SinkToFunction : public SinkEndpoint<T> {
public:
SinkToFunction(Fn fn) : fn_{std::move(fn)} {}
/* NOTE: conservative choice here, could templatize on this */
virtual bool allow_volatile_source() const override { return false; }
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
virtual void notify_ev(T const & ev) override {
++(this->n_in_ev_);
fn_(ev);
} /*notify_ev*/
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<SinkToFunction"
<< xtag("name", this->name())
<< xtag("n_in_ev", this->n_in_ev())
<< ">";
} /*display*/
private:
Fn fn_;
/* counts lifetime #of incoming events (see .notify_ev()) */
uint32_t n_in_ev_ = 0;
}; /*SinkToFunction*/
/* sink that prints to console */
template<typename T>
class SinkToConsole : public SinkEndpoint<T> {
public:
SinkToConsole() {}
virtual bool allow_volatile_source() const override { return true; }
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
virtual void notify_ev(T const & ev) override {
//using logutil::operator<<;
++(this->n_in_ev_);
std::cout << ev << std::endl;
} /*notify_ev*/
virtual void display(std::ostream & os) const override {
using xo::xtag;
os << "<SinkToConsole"
<< xtag("name", this->name())
<< xtag("n_in_ev", this->n_in_ev())
<< ">";
} /*display*/
private:
/* reporting name for this sink */
std::string name_;
/* counts lifetime #of incoming events (see .notify_ev()) */
uint32_t n_in_ev_ = 0;
}; /*SinkToConsole*/
#ifdef NOT_USING
class TemporaryTest {
public:
static ref::rp<SinkToConsole<std::pair<xo::time::utc_nanos, double>>> realization_printer();
}; /*TemporaryTest*/
#endif
} /*namespace reactor*/
} /*namespace xo*/
/* end Sink.hpp */

View file

@ -0,0 +1,20 @@
/* file init_reactor.hpp
*
* author: Roland Conybeare, Aug 2022
*/
#pragma once
#include "xo/subsys/Subsystem.hpp"
namespace xo {
enum S_reactor_tag {};
template<>
struct InitSubsys<S_reactor_tag> {
static void init();
static InitEvidence require();
};
} /*namespace xo*/
/* end init_reactor.hpp */

View file

@ -0,0 +1,93 @@
/* @file AbstractEventProcessor.cp */
#include "AbstractEventProcessor.hpp"
#include "xo/indentlog/print/tostr.hpp"
#include <unordered_map>
#include <map>
namespace xo {
using ref::rp;
using ref::brw;
using xo::tostr;
using std::uint32_t;
namespace reactor {
namespace {
/* search all event processors ep reachable (dowstream) from x,
* add to *m;
*/
void
map_network_helper(brw<AbstractEventProcessor> x,
uint32_t * tsort_ix,
std::unordered_map<AbstractEventProcessor*, uint32_t> * m)
{
if (m->contains(x.get()))
return;
auto fn = [tsort_ix, m]
(brw<AbstractEventProcessor> ep)
{
map_network_helper(ep, tsort_ix, m);
};
x->visit_direct_consumers(fn);
/* postorder! */
(*m)[x.get()] = ++(*tsort_ix);
} /*map_network_helper*/
} /*namespace*/
std::vector<rp<AbstractEventProcessor>>
AbstractEventProcessor::map_network(rp<AbstractEventProcessor> const & x)
{
std::unordered_map<AbstractEventProcessor *, std::uint32_t> network_map;
/* index event processors in reverse topological order:
* if B is (directly or indirectly) downstream from A,
* then tsort_ix(B) < tsort_ix(A)
*/
uint32_t tsort_ix = 0;
/* depth-first traversal, detect and short-circuit on dup paths */
map_network_helper(x.borrow(), &tsort_ix, &network_map);
/* invariant: tsort_ix = #of event processors in network */
uint32_t n = tsort_ix;
/* network_map, now in a topologically sorted order */
std::map<uint32_t, AbstractEventProcessor *> tsorted_map;
{
for(auto const & x : network_map) {
uint32_t tsort_ix = x.second;
AbstractEventProcessor * ep = x.first;
tsorted_map[n - tsort_ix] = ep;
}
}
std::vector<rp<AbstractEventProcessor>> retval;
{
for(auto const & x : tsorted_map)
retval.push_back(x.second);
}
return retval;
} /*map_network*/
void
AbstractEventProcessor::display(std::ostream & os) const
{
os << "<AbstractSource>";
} /*display*/
std::string
AbstractEventProcessor::display_string() const
{
return tostr(*this);
} /*display_string*/
} /*namespace reactor*/
} /*namespace xo*/
/* end AbstractEventProcessor.cpp */

View file

@ -0,0 +1,84 @@
/* @file AbstractSource.cpp */
#include "AbstractSource.hpp"
#include "xo/indentlog/scope.hpp"
#include "xo/webutil/StreamEndpointDescr.hpp"
//#include "indentlog/scope.hpp"
namespace xo {
using xo::web::StreamEndpointDescr;
using xo::reactor::AbstractSink;
using xo::ref::rp;
//using xo::scope;
//using xo::tostr;
namespace reactor {
StreamEndpointDescr
AbstractSource::stream_endpoint_descr(std::string const & url_prefix)
{
auto subscribe_fn
= ([this]
(rp<AbstractSink> const & ws_sink)
{
//scope lscope("AbstractSource::stream_endpoint_descr.subscribe_fn");
/* ws_sink created by websocket, sends events to websocket as json
* see [websock/WebsocketSink]
*/
return this->attach_sink(ws_sink);
});
auto unsubscribe_fn
= ([this]
(CallbackId id)
{
this->detach_sink(id);
});
return StreamEndpointDescr(url_prefix,
subscribe_fn,
unsubscribe_fn);
} /*stream_endpoint_descr*/
uint64_t
AbstractSource::deliver_n(uint64_t n)
{
uint64_t retval = 0;
for (uint64_t i=0; i<n; ++i) {
uint64_t n1 = this->deliver_one();
if (n1 == 0) {
/* short-circuit if source has less than n
* events available
*/
break;
}
retval += n1;
}
return retval;
} /*deliver_n*/
uint64_t
AbstractSource::deliver_all()
{
uint64_t retval = 0;
for (;;) {
uint64_t n1 = this->deliver_one();
if (n1 == 0)
break;
retval += n1;
}
return retval;
} /*deliver_all*/
} /*namespace reactor*/
} /*namespace xo*/
/* end AbstractSource.cpp */

View file

@ -0,0 +1,21 @@
# xo-reactor/src/reactor/CMakeLists.txt
set(SELF_LIB reactor)
set(SELF_SRCS
AbstractEventProcessor.cpp AbstractSource.cpp ReactorSource.cpp
Sink.cpp
Reactor.cpp PollingReactor.cpp
init_reactor.cpp)
xo_add_shared_library(${SELF_LIB} ${PROJECT_VERSION} 1 ${SELF_SRCS})
# ----------------------------------------------------------------
# external dependencies
# note: changes to xo_dependency() calls here
# must coordinate with find_dependency() calls
# in xo-reactor/cmake/reactorConfig.cmake.in
#
xo_dependency(${SELF_LIB} reflect)
xo_dependency(${SELF_LIB} webutil)
xo_dependency(${SELF_LIB} callback)

View file

@ -0,0 +1,88 @@
/* @file PollingReactor.cpp */
#include "PollingReactor.hpp"
namespace xo {
using ref::brw;
using std::size_t;
using std::uint64_t;
using std::int64_t;
namespace reactor {
bool
PollingReactor::add_source(brw<ReactorSource> src)
{
/* make sure src does not already appear in .source_v[] */
for(ReactorSourcePtr const & x : this->source_v_) {
if(x.get() == src.get()) {
throw std::runtime_error("PollingReactor::add_source; source already present");
return false;
}
}
src->notify_reactor_add(this);
this->source_v_.push_back(src.get());
return true;
} /*add_source*/
bool
PollingReactor::remove_source(brw<ReactorSource> src)
{
auto ix = std::find(this->source_v_.begin(),
this->source_v_.end(),
src);
if(ix != this->source_v_.end()) {
src->notify_reactor_remove(this);
this->source_v_.erase(ix);
return true;
}
return false;
} /*remove_source*/
int64_t
PollingReactor::find_nonempty_source(size_t start_ix)
{
size_t z = this->source_v_.size();
/* search sources [ix .. z) */
for(size_t ix = start_ix; ix < z; ++ix) {
brw<ReactorSource> src = this->source_v_[ix];
if(src->is_nonempty())
return ix;
}
/* search source [0 .. ix) */
for(size_t ix = 0, n = std::min(start_ix, z); ix < n; ++ix) {
brw<ReactorSource> src = this->source_v_[ix];
if(src->is_nonempty())
return ix;
}
return -1;
} /*find_nonempty_source*/
uint64_t
PollingReactor::run_one()
{
int64_t ix = this->find_nonempty_source(this->next_ix_);
if(ix >= 0) {
brw<ReactorSource> src = this->source_v_[ix];
return src->deliver_one();
} else {
return 0;
}
} /*run_one*/
} /*namespace reactor*/
} /*namespace xo*/
/* end PollingReactor.cpp */

26
src/reactor/Reactor.cpp Normal file
View file

@ -0,0 +1,26 @@
/* file Reactor.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "Reactor.hpp"
namespace xo {
namespace reactor {
void
Reactor::run_n(int32_t n)
{
if (n == -1) {
for (;;) {
this->run_one();
}
} else {
for (int32_t i=0; i<n; ++i) {
this->run_one();
}
}
} /*run_n*/
} /*namespace reactor*/
} /*namespace xo*/
/* end Reactor.cpp */

View file

@ -0,0 +1,34 @@
/* @file Source.cpp */
#include "ReactorSource.hpp"
#include "xo/indentlog/print/time.hpp"
#include <cstdint>
namespace xo {
using xo::time::utc_nanos;
namespace reactor {
utc_nanos
ReactorSource::online_current_tm() const
{
/* for an online source:
* .is_exhausted() must always be false;
* this implies that .sim_current_tm() should
* not be called in the first place
*/
assert(false);
return time::timeutil::epoch();
} /*online_current_tm*/
std::uint64_t
ReactorSource::online_advance_until(utc_nanos /*tm*/,
bool /*replay_flag*/)
{
return 0;
} /*online_advance_until*/
} /*namespace reactor*/
} /*namespace xo*/
/* end Source.cpp */

18
src/reactor/Sink.cpp Normal file
View file

@ -0,0 +1,18 @@
/* @file Sink.cpp */
#include "Sink.hpp"
#include "xo/refcnt/Refcounted.hpp"
namespace xo {
namespace reactor {
#ifdef NOT_USING
ref::rp<SinkToConsole<std::pair<xo::time::utc_nanos, double>>>
TemporaryTest::realization_printer()
{
return new SinkToConsole<std::pair<xo::time::utc_nanos, double>>();
} /*realization_printer*/
#endif
} /*namespace reactor*/
} /*namespace xo*/
/* end Sink.cpp */

View file

@ -0,0 +1,31 @@
/* file init_reactor.cpp
*
* author: Roland Conybeare, Aug 2022
*/
#include "init_reactor.hpp"
#include "xo/reflect/init_reflect.hpp"
namespace xo {
void
InitSubsys<S_reactor_tag>::init()
{
/* TODO: reflect reactor types */
} /*init*/
InitEvidence
InitSubsys<S_reactor_tag>::require()
{
InitEvidence retval;
/* subsystem dependencies for reactor/ */
retval ^= InitSubsys<S_reflect_tag>::require();
/* reactor/'s own initialization code */
retval ^= Subsystem::provide<S_reactor_tag>("reactor", &init);
return retval;
} /*require*/
} /*namespace xo*/
/* end init_reactor.cpp */