commit 532d48529f708234d3aac178ab4dc3d51b684147 Author: Roland Conybeare Date: Tue Oct 10 15:20:32 2023 -0400 initial implementation diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 00000000..9bbe8050 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,51 @@ +# xo-reactor/CMakeLists.txt + +cmake_minimum_required(VERSION 3.10) + +project(reactor VERSION 0.1) +enable_language(CXX) + +# common XO cmake macros (see proj/xo-cmake) +include(xo_macros/xo_cxx) +include(xo_macros/code-coverage) + +# ---------------------------------------------------------------- +# unit test setup + +enable_testing() +# activate code coverage for all executables + libraries (when configured with -DCODE_COVERAGE=ON) +add_code_coverage() +# 1. assuming that /nix/store/ prefixes .hpp files belonging to gcc, catch2 etc. +# we're not interested in code coverage for these sources. +# 2. exclude the utest/ subdir, we don't need coverage on the unit tests themselves; +# rather, want coverage on the code that the unit tests exercise. +# +# NOTE: this seems to work only with the 'ccov-all' target. In particular, doesn't seem to do anything with the 'ccov' target +# +add_code_coverage_all_targets(EXCLUDE /nix/store/* ${PROJECT_SOURCE_DIR}/utest/* ${PROJECT_BINARY_DIR}/local/* ${PROJECT_SOURCE_DIR}/repo/*) + +# ---------------------------------------------------------------- +# c++ settings + +# one-time project-specific c++ flags. usually empty +set(PROJECT_CXX_FLAGS "") +#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2") +add_definitions(${PROJECT_CXX_FLAGS}) + +xo_toplevel_compile_options() + +# ---------------------------------------------------------------- + +add_subdirectory(src/reactor) + +# ---------------------------------------------------------------- +# provide find_pacakge() support for reactor customers + +xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets) + +# ---------------------------------------------------------------- +# install .hpp files + +xo_install_include_tree() + +# end CMakeLists.txt diff --git a/README.md b/README.md new file mode 100644 index 00000000..02b7e7a2 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# reactor library + +in-memory queuing system + +# dependencies + +build+install these first + +- xo-reflect [github.com/Rconybea/xo-reflect] +- xo-callback [github.com/Rconybea/xo-callback] + +# build + install + +# build +``` +$ cd reactor +$ mkdir build +$ cd build +$ INSTALL_PREFIX=/usr/local # or wherever you prefer +$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} .. +$ make +$ make install +``` +(also see .github/workflows/main.yml) + +# build for unit test coverage +``` +$ cd xo-reactor +$ mkdir ccov +$ cd ccov +$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCODE_COVERAGE=ON -DCMAKE_BUILD_TYPE=Debug .. +``` + +# LSP support + +LSP looks for compile commands in the root of the source tree; +cmake creates them in the root of its build directory. + +``` +$ cd xo-reactor +$ ln -s build/compile_commands.json +``` diff --git a/cmake/reactorConfig.cmake.in b/cmake/reactorConfig.cmake.in new file mode 100644 index 00000000..5456d16b --- /dev/null +++ b/cmake/reactorConfig.cmake.in @@ -0,0 +1,13 @@ +@PACKAGE_INIT@ + +include(CMakeFindDependencyMacro) + +# note: changes to find_dependency() calls here +# must coordinate with xo_dependency() calls +# in xo-reactor/src/reactor/CMakeLists.txt +# +find_dependency(reflect) +find_dependency(callback) + +include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake") +check_required_components("@PROJECT_NAME@") diff --git a/include/xo/reactor/AbstractEventProcessor.hpp b/include/xo/reactor/AbstractEventProcessor.hpp new file mode 100644 index 00000000..820ab0ae --- /dev/null +++ b/include/xo/reactor/AbstractEventProcessor.hpp @@ -0,0 +1,50 @@ +/* @file AbstractEventProcessor.hpp */ + +#pragma once + +#include "xo/refcnt/Refcounted.hpp" +#include +#include +#include + +namespace xo { + namespace reactor { + /* common base class for {AbstractSource, AbstractSink}. + * An event processor can be: + * 1. an event source (inherits AbstractSource) + * 2. an event sink (inherits AbstractSink) + * 3. both source+sink (inherits both) + */ + class AbstractEventProcessor : virtual public ref::Refcount { + public: + /* reporting name for this source. ideally unique, but not required */ + virtual std::string const & name() const = 0; + /* set .name */ + virtual void set_name(std::string const & x) = 0; + + /* find all event processors ep reachable from x (i.e. downstream from x). + * report each such ep exactly once + */ + static std::vector> map_network(ref::rp const & x); + + /* visit direct downstream consumers c[i] of this event processor. + * call ep(c[i]) for each such consumer. + */ + virtual void visit_direct_consumers(std::function ep)> const & fn) = 0; + + /* write representation to stream */ + virtual void display(std::ostream & os) const = 0; + /* human-readable string identifying this source */ + virtual std::string display_string() const; + }; /*AbstractEventProcessor*/ + + inline std::ostream & + operator<<(std::ostream & os, AbstractEventProcessor const & src) { + src.display(os); + return os; + } /*operator<<*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end AbstractEventProcessor.hpp */ diff --git a/include/xo/reactor/AbstractSink.hpp b/include/xo/reactor/AbstractSink.hpp new file mode 100644 index 00000000..e855eda8 --- /dev/null +++ b/include/xo/reactor/AbstractSink.hpp @@ -0,0 +1,71 @@ +/* @file AbstractSink.hpp */ + +#pragma once + +#include "AbstractSource.hpp" +#include "xo/reflect/TaggedPtr.hpp" +#include "xo/reflect/TypeDescr.hpp" +//#include "time/Time.hpp" +#include "xo/indentlog/print/tag.hpp" +#include "xo/cxxutil/demangle.hpp" +#include + +namespace xo { + namespace reactor { + /* an event consumer. + * note that event representation is not specified, + * this helps avoid mandating a type hierarchy for events + */ + class AbstractSink : public virtual AbstractEventProcessor { + public: + using TypeDescr = reflect::TypeDescr; + using TaggedPtr = reflect::TaggedPtr; + + public: + virtual ~AbstractSink() = default; + + /* if true: sources may produce events of any reflected type. + * sink will accept such events using .notify_ev_tp() + * for example see web_util/WebsocketSink + * + * if false (common): souce is expected to to produce events of + * a single type, specified by .sink_ev_type() + * .notify_ev_tp() will downcast to that type. + * for example see reactor/Sink1 + * + * polymorphic sinks pay for runtime polymorphism + * (since WebsocketSink sends events in json format this is + * expected to be negligible compared to message formatting) + */ + virtual bool allow_polymorphic_source() const = 0; + + /* identify datatype for items expected by this sink */ + virtual TypeDescr sink_ev_type() const = 0; + + /* true iff this sink accepts volatile events. + * volatile events are events that may be modified + * or destroyed after being delivered to this sink. + * + * For example KalmanFilterSvc accepts volatile events, + * but EventStore requires non-volatile events. + */ + virtual bool allow_volatile_source() const = 0; + + /* counts lifetime #of incoming events for this sink */ + virtual uint32_t n_in_ev() const = 0; + + /* attach an input source. + * typically this means calling src.add_callback() + * with a function thats calls a .notify_xxx() method + * on this Sink + */ + virtual void attach_source(ref::rp const & src) = 0; + + /* accept incoming event, given by tagged pointer */ + virtual void notify_ev_tp(TaggedPtr const & ev_tp) = 0; + }; /*AbstractSink*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end AbstractSink.hpp */ diff --git a/include/xo/reactor/AbstractSource.hpp b/include/xo/reactor/AbstractSource.hpp new file mode 100644 index 00000000..ed31d04f --- /dev/null +++ b/include/xo/reactor/AbstractSource.hpp @@ -0,0 +1,94 @@ +/* @file AbstractSource.hpp */ + +#pragma once + +#include "AbstractEventProcessor.hpp" +#include "xo/reflect/TypeDescr.hpp" +#include "xo/callback/CallbackSet.hpp" +#include "xo/refcnt/Refcounted.hpp" +#include + +namespace xo { + namespace web { class StreamEndpointDescr; } + + namespace reactor { + class AbstractSink; + + template + class Sink1; + + /* abstract api for a source of events. + * Event representation is left open: Sources and Sinks + * need to have compatible event representations, + * and coordination is left to such (Source, Sink) pairs. + * + * See ReactorSource, for example + * + * Typically a Source will have one or more .add_callback() + * methods, for listening to source events + */ + class AbstractSource : public virtual AbstractEventProcessor { + public: + using StreamEndpointDescr = web::StreamEndpointDescr; + using TypeDescr = reflect::TypeDescr; + using CallbackId = fn::CallbackId; + + public: + /* identify datatype for items delivered by this source */ + virtual TypeDescr source_ev_type() const = 0; + + /* if true: event objects (see .source_ev_type()) + * maybe overwritten between callbacks. + * A sink that wants to capture events + * (e.g. EventStore<>) will need to deep-copy them + * if false: event objects are preserved between callbacks. + */ + virtual bool is_volatile() const = 0; + + /* counts #of outbound events ready for delivery, + * but not yet sent */ + virtual uint32_t n_queued_out_ev() const = 0; + /* counts lifetime #of events delivered. + * see also AbstractSink.n_in_ev + */ + virtual uint32_t n_out_ev() const = 0; + + /* if true, simulator will report interaction with this source */ + virtual bool debug_sim_flag() const = 0; + /* set .trace_sim_flag */ + virtual void set_debug_sim_flag(bool x) = 0; + + virtual CallbackId attach_sink(ref::rp const & sink) = 0; + virtual void detach_sink(CallbackId id) = 0; + + /* endpoint for a websocket subscriber; + * subscriber delivers events produced by this source + */ + StreamEndpointDescr stream_endpoint_descr(std::string const & url_prefix); + + /* typically expect events to be delivered using a reactor or simulator. + * (for example see reactor/Reactor, simulator/Simulator); + * reactor allocates cpu, and controls event ordering across sources + * when there are multiple sources. + * + * However, also possible for user code to invoke .deliver_one() directly. + * Beware, may get unpredictable results if attempt to do this on a source + * that's also attached to a reactor. + */ + virtual std::uint64_t deliver_one() = 0; + + /* convenience: call .deliver_one() n times, return sum of results */ + std::uint64_t deliver_n(uint64_t n); + + /* convenience: call .deliver_one() until it returns 0 + * (beware of inexhaustible sources!) + */ + std::uint64_t deliver_all(); + }; /*AbstractSource*/ + + using AbstractSourcePtr = ref::rp; + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end AbstractSource.hpp */ diff --git a/include/xo/reactor/DirectSource.hpp b/include/xo/reactor/DirectSource.hpp new file mode 100644 index 00000000..03710281 --- /dev/null +++ b/include/xo/reactor/DirectSource.hpp @@ -0,0 +1,19 @@ +/* @file DirectSource.hpp */ + +#pragma once + +#include "time/Time.hpp" +#include "reactor/Sink.hpp" +#include "reactor/EventSource.hpp" +#include "reactor/HeapReducer.hpp" +//#include "reactor/LastReducer.hpp" +#include "reactor/Reactor.hpp" +#include "callback/CallbackSet.hpp" + +namespace xo { + namespace reactor { + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end DirectSource.hpp */ + diff --git a/include/xo/reactor/DirectSourcePtr.hpp b/include/xo/reactor/DirectSourcePtr.hpp new file mode 100644 index 00000000..db9c7734 --- /dev/null +++ b/include/xo/reactor/DirectSourcePtr.hpp @@ -0,0 +1,25 @@ +/* @file DirectSourcePtr.hpp */ + +#pragma once + +#include "reactor/SecondarySource.hpp" +#include "reactor/LastReducer.hpp" +#include "reactor/EventTimeFn.hpp" + +namespace xo { + namespace reactor { + template + using DirectSource = SecondarySource>>; + + /* use when Event is ref::rp for some T */ + template + using DirectSourcePtr = SecondarySource>>; + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end DirectSourcePtr.hpp */ diff --git a/include/xo/reactor/EventSource.hpp b/include/xo/reactor/EventSource.hpp new file mode 100644 index 00000000..a60297f7 --- /dev/null +++ b/include/xo/reactor/EventSource.hpp @@ -0,0 +1,25 @@ +/* @file EventSource.hpp */ + +#pragma once + +#include "reactor/ReactorSource.hpp" +#include "callback/CallbackSet.hpp" + +namespace xo { + namespace reactor { + template + class EventSource : public ReactorSource { + public: + using CallbackId = fn::CallbackId; + + public: + virtual CallbackId add_callback(ref::rp const & cb) = 0; + virtual void remove_callback(CallbackId id) = 0; + }; /*EventSource*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end EventSource.hpp */ diff --git a/include/xo/reactor/EventStore.hpp b/include/xo/reactor/EventStore.hpp new file mode 100644 index 00000000..62ab7f9a --- /dev/null +++ b/include/xo/reactor/EventStore.hpp @@ -0,0 +1,317 @@ +/* @file EventStore.hpp */ + +#pragma once + +#include "reactor/Reducer.hpp" +#include "reactor/EventTimeFn.hpp" +#include "reactor/Sink.hpp" +#include "web_util/HttpEndpointDescr.hpp" +#include "printjson/PrintJson.hpp" +#include "reflect/Reflect.hpp" +#include "tree/RedBlackTree.hpp" + +namespace xo { + namespace reactor { + + /* abstract event store api */ + class AbstractEventStore : virtual public ref::Refcount { + public: + using PrintJson = xo::json::PrintJson; + using TaggedPtr = xo::reflect::TaggedPtr; + using HttpEndpointDescr = xo::web::HttpEndpointDescr; + using Alist = xo::web::Alist; + + public: + /* true iff .size() == 0 */ + virtual bool empty() const = 0; + + /* #of events currently held in this store */ + virtual std::uint32_t size() const = 0; + + /* TODO: + * 1. TaggedGptr = discriminated union of + * a. TaggedRcptr (i.e. refcounted semantics) + * b. Unique (i.e. unique_ptr semantics) + * c. Exptr (i.e. unowned_ptr semantics) + * d. compact (special-case -- value fits in pointer) + * will need mpl copy/assign stuff for TaggedUnique + * 2. provide .last_n(), .last_dt() + */ + + virtual void http_snapshot(ref::rp const & pjson, + std::ostream * p_os) const = 0; + + /* http endpoint; generates http output for this eventstore */ + virtual HttpEndpointDescr http_endpoint_descr(ref::rp const & pjson, + std::string const & url_prefix) const { + + /* important that lambda contains its own rp; + * reference to stack will not do + */ + ref::rp pjson_rp = pjson; + + auto http_fn = ([this, pjson_rp] + (std::string const & /*uri*/, + Alist const & /*alist*/, + std::ostream * p_os) + { + /* WARNING: race condition here, + * given webserver runs from a separate thread + */ + + this->http_snapshot(pjson_rp, p_os); + }); + + return HttpEndpointDescr(url_prefix + "/snap", http_fn); + } /*http_endpoint_descr*/ + + virtual void clear() = 0; + + virtual void insert_tp(TaggedPtr const & ev_tp) = 0; + }; /*AbstractEventStore*/ + + /* in-memory storage for a set of events. + * + * Require: + * - Event is null-constructible + * - Event is copyable + * - EventTimeFn :: Event -> utc_nanos + * + * inheritance + * ref::Refcount + * ^ + * isa + * | + * reactor::AbstractEventProcessor + req .visit_direct_consumers() + * ^ + * isa + * | + * reactor::AbstractSink + req .sink_ev_type(), .notify_ev() etc. + * ^ + * isa + * | + * reactor::Sink1 + .attach_source(), .sink_ev_type(), + * ^ req .notify_ev() etc + * | + * isa + * | + * reactor::SinkEndpoint + impl .visit_direct_consumers() + * ^ + * isa + * | + * reactor::StructEventStore + .last_n() .last_dt() etc. + */ + template + class EventStoreImpl : public SinkEndpoint, + public AbstractEventStore, + ReducerBase + { + static_assert(EventTimeConcept); + + public: + using utc_nanos = xo::time::utc_nanos; + using nanos = xo::time::nanos; + using EventTree = xo::tree::RedBlackTree>; + using PrintJson = xo::json::PrintJson; + using Alist = xo::web::Alist; + using HttpEndpointDescr = xo::web::HttpEndpointDescr; + + static ref::rp make() { return new EventStoreImpl(); } + + /* visit most recent n events in this store. + * returns #of events actually visited + * + * if events visited are e1 .. en, then: + * (1) en is the most recent recorded event + * (.event_tm(en) is .tree.max_key()) + * (2) there are no events between e(i) and e(i+1) + * (i.e. visit does not skip over any events) + * (3) if v < n, then v = .size(), + * where v is the #of events visited + * + * require: + * - Fn :: (Event -> ) + */ + template + std::uint32_t visit_last_n(std::uint32_t n, Fn && fn) const { + std::uint32_t z = this->size(); + std::uint32_t lo = ((n >= z) ? 0 : z - n); + + typename EventTree::const_iterator lo_ix = this->tree_.find_ith(lo); + typename EventTree::const_iterator hi_ix = this->tree_.cend(); + + return this->visit_range(lo_ix, hi_ix, fn); + } /*visit_last_n*/ + + /* visit suffix of events sufficient to cover interval of length dt. + * visit events in increasing timestamp order. + * + * if events visited are e1 .. en, then: + * (1) en is the most recent recorded event + * (.event_tm(en) is .tree.max_key()) + * (2) there are no events between e(i) and e(i+1) + * (i.e. visit does not skip over any events) + * (3) if .event_tm(en) - .event_tm(e1) < dt, + * then e1 is the earliest recorded event + * (.event_tm(e1) is .tree.min_key()) + * (4) if .event_tm(en) - .event_tm(e1) > dt, + * then (.event_tm(en) - .event_tm(e2)) < dt + * + * |<---------- dt ----------->| + * ^ ^ ^ + * e1 e2 en + */ + template + std::uint32_t visit_last_dt(nanos dt, Fn && fn) const { + if (tree_.empty()) + return 0; + + /* tree not empty -> has max key */ + utc_nanos tn = this->tree_.max_key(); + utc_nanos tk = tn - dt; + + typename EventTree::const_iterator lo_ix = this->tree_.find_glb(tk, true /*closed*/); + typename EventTree::const_iterator hi_ix = this->tree_.end(); + + return this->visit_range(lo_ix, hi_ix, fn); + } /*visit_last_dt*/ + + std::vector last_n(std::uint32_t n) const { + std::vector retval; + + auto fn = [&retval](Event const &ev) { retval.push_back(ev); }; + + this->visit_last_n(n, fn); + + return retval; + } /*last_n*/ + + std::vector last_dt(nanos dt) const { + std::vector retval; + + auto fn = [&retval](Event const &ev) { retval.push_back(ev); }; + + this->visit_last_dt(dt, fn); + + return retval; + } /*last_dt*/ + + void insert(Event const & ev) { this->tree_.insert(typename EventTree::value_type(this->event_tm(ev), ev)); } + + // ----- Inherited from AbstractEventStore ----- + + virtual bool empty() const override { return tree_.empty(); } + virtual std::uint32_t size() const override { return tree_.size(); } + + /* write http snapshot of current state to *p_os */ + virtual void http_snapshot(ref::rp const & pjson, std::ostream * p_os) const override { + using xo::reflect::Reflect; + + /* visit last 100 events; + * write them to *p_os in increasing time order + */ + auto ev_v = this->last_n(100); + + pjson->print_tp(Reflect::make_tp(&ev_v), p_os); + } /*http_snapshot*/ + + virtual void clear() override { this->tree_.clear(); } + + virtual void insert_tp(TaggedPtr const & ev_tp) override { + using xo::xtag; + + Event * p_ev = ev_tp.recover_native(); + + if (p_ev) { + this->insert(*p_ev); + } else { + throw std::runtime_error(tostr("StructEventStore::insert_tp" + ": unable to convert ev_tp to Event", + xtag("ev_tp.type", ev_tp.td()->canonical_name()), + xtag("Event", reflect::type_name()))); + } + } /*insert_tp*/ + + // ----- Inherited from AbstractSink ----- + + virtual uint32_t n_in_ev() const override { return n_in_ev_; } + virtual bool allow_volatile_source() const override { return false; } + virtual void notify_ev(Event const & ev) override { + ++(this->n_in_ev_); + this->insert(ev); + } + + // ----- Inherited from AbstractSource ----- + + virtual void display(std::ostream & os) const override { + using xo::xtag; + + os << "name()) + << xtag("n_in_ev", this->n_in_ev()) + << ">"; + } /*display*/ + + // ----- Inherited from AbstractEventProcessor ----- + + virtual std::string const & name() const override { return name_; } + virtual void set_name(std::string const & x) override { name_ = x; } + + private: + EventStoreImpl() = default; + + template + std::uint32_t visit_range(typename EventTree::const_iterator lo_ix, + typename EventTree::const_iterator hi_ix, + Fn && fn) const { + std::uint32_t n = 0; + for (; lo_ix != hi_ix; ++lo_ix, ++n) { + fn(lo_ix->second); + } + + return n; + } /*visit_range*/ + + private: + /* reporting name for this store */ + std::string name_; + /* fetches per-event timestamp */ + EventTimeFn event_tm_fn_; + /* counts lifetime #of incoming events (see .notify_ev()) */ + uint32_t n_in_ev_ = 0; + /* events stored here */ + EventTree tree_; + }; /*EventStoreImpl*/ + + template + using StructEventStore = EventStoreImpl>; + + template + using PtrEventStore = EventStoreImpl>; + + /* Require: + * EventTimeConcept> + */ + template + class SinkToEventStore : public SinkEndpoint { + public: + using EventStore = StructEventStore; + + public: + SinkToEventStore() = default; + + virtual void notify_ev(T const & ev) override { + store_.insert(ev); + } /*notify_ev*/ + + private: + /* stash remembered events (all of them!) here */ + EventStore store_; + }; /*SinkToEventStore*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end EventStore.hpp */ diff --git a/include/xo/reactor/EventTimeFn.hpp b/include/xo/reactor/EventTimeFn.hpp new file mode 100644 index 00000000..28c36d0b --- /dev/null +++ b/include/xo/reactor/EventTimeFn.hpp @@ -0,0 +1,38 @@ +/* @file EventTimeFn.hpp */ + +#pragma once + +#include "time/Time.hpp" +#include + +namespace xo { + namespace reactor { + template + concept EventTimeConcept = requires(EventTimeFn etfn, Event ev) { + { etfn(ev) } -> std::same_as; + }; + + template + class StructEventTimeFn { + public: + using event_t = Event; + using utc_nanos = xo::time::utc_nanos; + + public: + utc_nanos operator()(Event const & ev) const { return ev.tm(); } + }; /*StructEventTimeFn*/ + + template + class PtrEventTimeFn { + public: + using event_t = Event; + using utc_nanos = xo::time::utc_nanos; + + public: + utc_nanos operator()(Event const & ev) const { return ev->tm(); } + }; /*PtrEventTimeFn*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end EventTimeFn.hpp */ diff --git a/include/xo/reactor/HeapReducer.hpp b/include/xo/reactor/HeapReducer.hpp new file mode 100644 index 00000000..5355e056 --- /dev/null +++ b/include/xo/reactor/HeapReducer.hpp @@ -0,0 +1,72 @@ +/* @file HeapReducer.hpp */ + +#pragma once + +#include "reactor/Reducer.hpp" + +namespace xo { + namespace reactor { + /* collect incoming events in a heap, + * ordered by timestamp. + * output events in increasing timestamp order. + * Information preserving in all other respects + * + * Require: + * - Event is null-constructible + * - Event is copyable + * - EventTimeFn :: Event -> utc_nanos + */ + template> + class HeapReducer : public ReducerBase { + public: + using utc_nanos = xo::time::utc_nanos; + public: + HeapReducer() = default; + HeapReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} + + bool is_empty() const { return this->event_heap_.empty(); } + /* require: .is_empty() = false */ + utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); } + /* #of events stored in this reducer */ + uint32_t n_event() const { return this->event_heap_.size(); } + + Event const & last_annexed_ev() const { return this->annexed_ev_; } + + void include_event(Event const & ev) { + this->event_heap_.push_back(ev); + std::push_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + } /*include_event*/ + + void include_event(Event && ev) { + this->event_heap_.push_back(std::move(ev)); + std::push_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + } /*include_event*/ + + Event & annex_one() { + this->annexed_ev_ = this->event_heap_.front(); + std::pop_heap(this->event_heap_.begin(), + this->event_heap_.end(), + std::greater()); + this->event_heap_.pop_back(); + + return this->annexed_ev_; + } /*annex_one*/ + + // ----- Inherited from ReducerBase ----- + + // utc_nanos event_tm(Event const & x); + + private: + /* queued Events, in increasing timestamp order */ + std::vector event_heap_; + /* annexed event, removed from .event_heap */ + Event annexed_ev_; + }; /*HeapReducer*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end HeapReducer.hpp */ diff --git a/include/xo/reactor/LastReducer.hpp b/include/xo/reactor/LastReducer.hpp new file mode 100644 index 00000000..86a5913c --- /dev/null +++ b/include/xo/reactor/LastReducer.hpp @@ -0,0 +1,154 @@ +/* @file LastReducer.hpp */ + +#pragma once + +#include "reactor/Reducer.hpp" +#include + +namespace xo { + namespace reactor { + /* implementation record used in LastReducer. + * LastReducer (see below) remembers a single event, + * + will be updated on successive calls to + * LastReducer.include_event() + * + * need to remember the _first_ (& therefore earliest) + * event timestamp in such a wave, since that establishes when simulator + * should deliver the event -- even if event is subsequently + * overwritten. + * + * once event is delivered, timestamp can reset + * + * otherwise if upstream producer sends events with + * future timestamps, can get indefinite postponement + * with simulation clock failing to catch up to event time. + * + */ + + template + class EventRecd { + public: + using utc_nanos = xo::time::utc_nanos; + + public: + EventRecd() = default; + EventRecd(utc_nanos tm, Event ev) : trigger_tm_{tm}, ev_{ev} {} + EventRecd(utc_nanos tm, Event && ev) : trigger_tm_{tm}, ev_{std::move(ev)} {} + + public: + /* if sim, deliver event when simulation clock reaches + * .trigger_tm; .trigger_tm can be earlier than .ev time + */ + utc_nanos trigger_tm_; + /* event to deliver */ + Event ev_; + }; + + /* reducer that just remembers the last event + * + * Require: + * - Event is null-contructible + * - Event is copyable + * + * LastReducer provides reentrancy support. This support doesn't operate + * if Event copy is not deep, e.g. for Event = rpn + * + * .include_event() + * /-------\ -----------------> /------\ + * | empty | | full | + * \-------/ <----------------- \------/ + * . .annex_one() . + * . . + * .is_empty()=true .is_empty()=false + */ + template> + class LastReducer : public ReducerBase { + public: + using utc_nanos = xo::time::utc_nanos; + + public: + LastReducer() = default; + LastReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} + + bool is_empty() const { return empty_flag_; } + /* require: .is_empty() = false */ + utc_nanos next_tm() const { + return this->last_ev_[this->last_ix_].trigger_tm_; + //return this->event_tm(this->last_ev_[this->last_ix_]); + } + /* #of events stored in this reducer (0 or 1) */ + uint32_t n_event() const { return this->empty_flag_ ? 0 : 1; } + + Event const & last_annexed_ev() const { + return this->last_ev_[1 - this->last_ix_].ev_; + } + + EventRecd & include_event_aux(Event const & ev) { + EventRecd & evr + = this->last_ev_[this->last_ix_]; + + if (this->empty_flag_) { + /* evr.trigger_tm will be preserved across + * successive calls to .include_event(); + * until .annex_one() + */ + evr.trigger_tm_ = this->event_tm(ev); + + this->empty_flag_ = false; + } + + return evr; + } /*include_event_aux*/ + + void include_event(Event const & ev) { + EventRecd & evr + = this->include_event_aux(ev); + + evr.ev_ = ev; + } /*include_event*/ + + void include_event(Event && ev) { + EventRecd & evr + = this->include_event_aux(ev); + + evr.ev_ = std::move(ev); + } /*include_event*/ + + Event & annex_one() { + std::uint32_t annexed_ix = this->last_ix_; + + /* since .empty_flag is true, + * next call to .include_event_aux() will + * capture new timestamp + */ + this->empty_flag_ = true; + this->last_ix_ = (1 - this->last_ix_); + + return this->last_ev_[annexed_ix].ev_; + } /*annex_one*/ + + // ----- Inherited from ReducerBase ----- + + //utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); } + + private: + /* true when reducer contains 0 queued events, + * not counting any annexed event + */ + bool empty_flag_ = true; + + /* .last_ev[.last_ix] updated by .include_event() + */ + std::uint32_t last_ix_ = 0; + /* remember two events + * (a) a single queued event (updated by .include_event()) + * (b) a single removed event (reported by .annex_one()) + * + * roles of .last_ev[0], .last_ev[1] reverse each time .annex_one() runs + */ + std::array, 2> last_ev_; + }; /*LastReducer*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end LastReducer.hpp */ diff --git a/include/xo/reactor/PollingReactor.hpp b/include/xo/reactor/PollingReactor.hpp new file mode 100644 index 00000000..7dbad4ab --- /dev/null +++ b/include/xo/reactor/PollingReactor.hpp @@ -0,0 +1,44 @@ +/* @file PollingReactor.hpp */ + +#pragma once + +#include "Reactor.hpp" +#include "ReactorSource.hpp" +#include +#include + +namespace xo { + namespace reactor { + /* reactor that runs by polling an ordered set of sources */ + class PollingReactor : public Reactor { + public: + PollingReactor() = default; + + // ----- inherited from Reactor ----- + + virtual bool add_source(ref::brw src) override; + virtual bool remove_source(ref::brw src) override; + virtual std::uint64_t run_one() override; + + private: + /* find non-empty source, starting from .source_v_[start_ix], + * wrapping around to .source_v_[start_ix - 1]. + * + * return index of first available non-empty source, + * or -1 if all sources are empty + */ + std::int64_t find_nonempty_source(std::size_t start_ix); + + private: + /* next source to poll will be .source_v_[.next_ix_] */ + std::size_t next_ix_ = 0; + + /* ordered set of sources (see reactor::Source) + * reactor will poll sources in round-robin order + */ + std::vector source_v_; + }; /*PollingReactor*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end PollingReactor.hpp */ diff --git a/include/xo/reactor/PolyAdapterSink.hpp b/include/xo/reactor/PolyAdapterSink.hpp new file mode 100644 index 00000000..e8f68eb9 --- /dev/null +++ b/include/xo/reactor/PolyAdapterSink.hpp @@ -0,0 +1,92 @@ +/* file PolyAdapterSink.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +#include "Sink.hpp" +#include "xo/reflect/Reflect.hpp" + +namespace xo { + namespace reactor { + /* adapter between a source that delivers a particular event type T, + * and a sink that accepts arbitrarily-typed events via .notify_ev_tp() + * Use this to connect to a polymorphic sink. + * + * Require: + * - .poly_sink.allow_polymorphic_source() + * (ofc. otherwise no point in using PolyAdapterSink) + * - .poly_sink.allow_volatile_source() + * need this bc will be wrapping event with TaggedPtr, + * which doesn't manage event lifetime + */ + template + class PolyAdapterSink : public reactor::Sink1 { + public: + using Reflect = reflect::Reflect; + using TaggedPtr = reflect::TaggedPtr; + + public: + /* named ctor idiom */ + static ref::rp make(ref::rp poly_sink) { + //xo::scope lscope("PolyAdapterSink::make"); + + ref::rp retval(new PolyAdapterSink(poly_sink)); + + //lscope.log("adapter", (void*)retval.get()); + + return retval; + } /*make*/ + + // ----- Inherited from Sink1 ----- + + virtual void notify_ev(T const & ev) override { + //xo::scope lscope("PolyAdapterSink::notify_ev"); + //lscope.log(xo::xtag("ev", ev)); + + TaggedPtr ev_tp = Reflect::make_tp(const_cast(&ev)); + + this->notify_ev_tp(ev_tp); + } /*notify_ev*/ + + // ----- Inherited from AbstractSink ----- + + virtual bool allow_volatile_source() const override { return true; } + virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); } + /* note: ok to do this, however if expecting to use this entry point, + * maybe don't need to interpose PolyAdapterSink ahead of .poly_sink + */ + virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { + //xo::scope lscope("PolyAdapterSink::notify_ev_tp"); + + return this->poly_sink_->notify_ev_tp(ev_tp); + } + + // ----- Inherited from AbstractEventProcessor ----- + + virtual std::string const & name() const override { return this->poly_sink_->name(); } + virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); } + virtual void visit_direct_consumers(std::function ep)> const & fn) override { + this->poly_sink_->visit_direct_consumers(fn); + } + virtual void display(std::ostream & os) const override { + using xo::xtag; + os << "()) + << xtag("poly", this->poly_sink_) + << ">"; + } /*display*/ + + private: + PolyAdapterSink(ref::rp poly_sink) : poly_sink_{std::move(poly_sink)} {} + + private: + /* mandate: .poly_sink.allow_polymorphic_source() is true */ + ref::rp poly_sink_; + }; /*PolyAdapterSink*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end PolyAdapterSink.hpp */ diff --git a/include/xo/reactor/Reactor.hpp b/include/xo/reactor/Reactor.hpp new file mode 100644 index 00000000..c56a590c --- /dev/null +++ b/include/xo/reactor/Reactor.hpp @@ -0,0 +1,63 @@ +/* @file Reactor.hpp */ + +#pragma once + +#include "xo/refcnt/Refcounted.hpp" +#include + +namespace xo { + namespace reactor { + class ReactorSource; + + /* abtract api for a reactor: + * something that arranges to have work done on a set of Sources. + */ + class Reactor : public ref::Refcount { + public: + virtual ~Reactor() = default; + + /* add source src to this reactor. + * on success, invoke src.notify_reactor_add(this) + * + * returns true if source added; false if already present + */ + virtual bool add_source(ref::brw src) = 0; + + /* remove source src from this reactor. + * source must previously have been added by + * .add_source(src). + * + * on success, invoke src.notify_reactor_remove(this) + * + * returns true if source removed; false if not present + */ + virtual bool remove_source(ref::brw src) = 0; + + /* notification when non-primed source (source with no known events) + * becomes primed (source with at least one event) + */ + virtual void notify_source_primed(ref::brw src) = 0; + + /* dispatch one reactor event, borrowing the calling thread + * amount of work this represents is Source/Sink specific. + * + * returns #of events dispatched (0 or 1) + */ + virtual std::uint64_t run_one() = 0; + + /* borrow calling thread to dispatch reactor events. + * if n is -1, run indefinitely + * otherwise dispatch up to n events. + * n = 0 is a noop + */ + void run_n(int32_t n); + + /* borrow calling thread to run indefinitely. + * suitable implementation for dedicated reactor threads + */ + void run() { this->run_n(-1); } + }; /*Reactor*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Reactor.hpp */ diff --git a/include/xo/reactor/ReactorSource.hpp b/include/xo/reactor/ReactorSource.hpp new file mode 100644 index 00000000..92f275d4 --- /dev/null +++ b/include/xo/reactor/ReactorSource.hpp @@ -0,0 +1,130 @@ +/* @file ReactorSource.hpp */ + +#pragma once + +#include "AbstractSource.hpp" +//#include "time/Time.hpp" +#include + +namespace xo { + namespace reactor { + class Reactor; + + /* abstract api for a source of events. + * Event representation is left open: Sources and Sinks + * need to have compatible event representations, + * and coordination is left to such (Source, Sink) pairs. + * + * Source->Sink activity may be expected to be mediated by a reactor, + * that implements the Reactor api. + * + * At any time, A Source can be associated with at most one reactor. + * Sources are informed of Reactor<->Source association being + * formed/broken by the + * .notify_reactor_add(), .notify_reactor_remove() + * methods + * + * The source api intends also to provide for simulation. + * There introduces two simulation-specific methods: + * .sim_current_tm() + * .sim_advance_until() + * + * A non-simulation source can implement these as calls to + * .online_current_tm(), .online_advance_until() respectively + * .online_current_tm() aborts since an online source is never exhausted + * .online_advance_until() is a no-op that returns 0 + * + * Loop for consuming from a primary simulation source: + * + * brw s = ...; + * while(!s->is_exhausted()) + * s->deliver_one(); + * + * Secondary sources (sources that depend on other sources) can be + * in a state where they don't know their next event, in which case: + * + * s->is_notprimed() == true + */ + class ReactorSource : public AbstractSource { + public: + using utc_nanos = xo::time::utc_nanos; + + public: + virtual ~ReactorSource() = default; + + /* true if source is currently empty (has 0 events to deliver) */ + virtual bool is_empty() const = 0; + bool is_nonempty() const { return !this->is_empty(); } + + /* true when source knows its next event + * A source that isn't primed is also excluded from simulation + * heap until it becomes primed. + * This make feasible simulation sources that + * depend on other simulation sources + */ + virtual bool is_primed() const { return !this->is_empty(); } + virtual bool is_notprimed() const { return this->is_empty(); } + + /* if true, this source has no events, and will never publish more events + * - for sim, return true for a standalone source that has replayed all events + * - for rt, set during orderly + */ + virtual bool is_exhausted() const = 0; + + /* if this is a simulation source and .is_exhausted is false: + * returns next event time; more precisely, no events exist prior to + * this time. + * + * if sim, and .is_primed = true, + * returns timestamp of next event + */ + virtual utc_nanos sim_current_tm() const = 0; + + /* promise: + * - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true + * - if replay_flag is true, then any events between previous .current_tm() + * and new .current_tm() will have been published + * + * returns #of events delivered. + * does not count events that were skipped, so always returns 0 if + * replay_flag is false + */ + virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0; + + /* informs source when it's added to a reactor + + * (see Reactor.add_source()) + */ + virtual void notify_reactor_add(Reactor * /*reactor*/) {} + + /* informs source when it's removed from a reactor + * (see Reactor.remove_source()) + */ + virtual void notify_reactor_remove(Reactor * /*reactor*/) {} + + // ----- Inherited from AbstractSource ----- + + /* deliver one event to attached sink + * interpretation of 'one event' is source-specific; + * could be a collapsed or batched event in practice. + * + * no-op if source is empty. + * + * if sim, promise: + * - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted() + * + * returns #of events delivered. Must be 0 or 1 in this context + */ + virtual std::uint64_t deliver_one() override = 0; + + protected: + /* default implementations for online sources */ + utc_nanos online_current_tm() const; + uint64_t online_advance_until(utc_nanos tm, bool replay_flag); + }; /*ReactorSource*/ + + using ReactorSourcePtr = ref::rp; + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end ReactorSource.hpp */ diff --git a/include/xo/reactor/Reducer.hpp b/include/xo/reactor/Reducer.hpp new file mode 100644 index 00000000..907d9d04 --- /dev/null +++ b/include/xo/reactor/Reducer.hpp @@ -0,0 +1,33 @@ +/* @file Reducer.hpp */ + +#pragma once + +#include "reactor/EventTimeFn.hpp" + +namespace xo { + namespace reactor { + /* LastReducer, HeapReducer inherit ReducerBase */ + template + class ReducerBase { + static_assert(EventTimeConcept); + + public: + using utc_nanos = xo::time::utc_nanos; + + public: + ReducerBase() = default; + ReducerBase(EventTimeFn const & evtfn) : event_tm_fn_{evtfn} {} + + utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); } + + private: + /* Event ev = ...; + * .event_tm_fn(ev) -> utc_nanos + * reports event time associated with ev + */ + EventTimeFn event_tm_fn_; + }; /*ReducerBase*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Reducer.hpp */ diff --git a/include/xo/reactor/SecondarySource.hpp b/include/xo/reactor/SecondarySource.hpp new file mode 100644 index 00000000..398a17bb --- /dev/null +++ b/include/xo/reactor/SecondarySource.hpp @@ -0,0 +1,359 @@ +/* @file SecondarySource.hpp */ + +#pragma once + +#include "time/Time.hpp" +#include "reactor/Sink.hpp" +#include "reactor/DirectSource.hpp" +#include "reactor/Reactor.hpp" +#include "callback/CallbackSet.hpp" +#include "reflect/demangle.hpp" +#include + +namespace xo { + namespace reactor { + /* A passive event source. + * Can use as backend publisher when implementating another + * event processor. + */ + template> + class SecondarySource : public EventSource> { + public: + using EventSink = Sink1; + template + using RpCallbackSet = fn::RpCallbackSet; + using CallbackId = fn::CallbackId; + using TypeDescr = xo::reflect::TypeDescr; + using utc_nanos = xo::time::utc_nanos; + + public: + ~SecondarySource() = default; + + static ref::rp make() { return new SecondarySource(); } + + /* last event delivered from this source -- + * i.e. event in most recent call to .deliver_one_aux() + */ + Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); } + + void notify_upstream_exhausted() { this->upstream_exhausted_ = true; } + + /* make event available to reactor, by adding to internal reducer */ + void notify_secondary_event(Event const & ev) { + /* test if ev is priming, update .current_tm */ + bool is_priming = this->preprocess_secondary_event(ev); + + this->reducer_.include_event(ev); + + this->postprocess_secondary_event(is_priming); + } /*notify_secondary_event*/ + + void notify_secondary_event(Event && ev) { + bool is_priming = this->preprocess_secondary_event(ev); + + this->reducer_.include_event(ev); + + this->postprocess_secondary_event(is_priming); + } /*notify_secondary_event*/ + + template + void notify_secondary_event_v(T const & v) { + using xo::scope; + using xo::xtag; + + if (v.empty()) + return; + + scope log(XO_DEBUG(this->debug_sim_flag_)); + + log && log(xtag("name", this->name())); + + if (this->upstream_exhausted_) { + throw std::runtime_error("SecondarySource::notify_secondary_event_v" + ": not allowed after upstream exhausted"); + } + + uint32_t n_ev = 0; + + for (Event const & ev : v) { + utc_nanos evtm = this->reducer_.event_tm(ev); + + if (this->current_tm_ < evtm) + this->current_tm_ = evtm; + + ++n_ev; + } + + log && log(xtag("T", reflect::type_name()), + xtag("n_ev", n_ev)); + + if (n_ev > 0) { + /* if reducer is empty when .notify_secondary_event_v() begins, + * then reactor/simulator needs to be notified that source is no longer empty + */ + bool is_priming = this->reducer_.is_empty(); + + for (Event const & ev : v) + this->reducer_.include_event(ev); + + Reactor * reactor = this->parent_reactor_; + + if (reactor) { + if (is_priming) { + /* reactor/simulator takes responsibility for delivering events */ + reactor->notify_source_primed(ref::brw::from_native(this)); + } + } else { + /* special case if no reactor: deliver immediately */ + + //this->deliver_one(); + this->deliver_all(); + } + } + } /*notify_secondary_event_v*/ + + // ----- inherited from EventSource ----- + + CallbackId add_callback(ref::rp const & cb) override { + return this->cb_set_.add_callback(cb); + } /*add_callback*/ + + void remove_callback(CallbackId id) override { + this->cb_set_.remove_callback(id); + } /*remove_callback*/ + + // ----- inherited from ReactorSource ----- + + virtual bool is_empty() const override { return this->reducer_.is_empty(); } + virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); } + + virtual utc_nanos sim_current_tm() const override { + using xo::scope; + using xo::xtag; + + if (this->reducer_.is_empty()) { + /* this is a tricky case. + * it means this source doesn't + * _know_ specific next event yet; however new events + * may appear at any time by way of .notify_event() + * + * If event doesn't know next event, then .current_tm isn't useful + * for establishing priority relative to other sources. + * rely on priming mechanism instead, + * which means that control should never come here. + */ + return this->current_tm_; + } else { + scope log(XO_DEBUG(false /*this->debug_sim_flag_*/), + xtag("name", this->name_), + xtag("next_tm", this->reducer_.next_tm())); + + return this->reducer_.next_tm(); + } + } /*sim_current_tm*/ + + virtual std::uint64_t deliver_one() override { + return this->deliver_one_aux(true /*replay_flag*/); + } + + virtual std::uint64_t sim_advance_until(utc_nanos target_tm, + bool replay_flag) override + { + uint64_t retval = 0; + + while (!this->reducer_.is_empty()) { + utc_nanos tm = this->sim_current_tm(); + + if (tm < target_tm) { + retval += this->deliver_one_aux(replay_flag); + } else { + break; + } + } + + return retval; + } /*sim_advance_until*/ + + virtual void notify_reactor_add(Reactor * reactor) override { + assert(!this->parent_reactor_); + + this->parent_reactor_ = reactor; + } /*notify_reactor_add*/ + + virtual void notify_reactor_remove(Reactor * /*reactor*/) override {} + + // ----- inherited from AbstractSource ----- + + virtual TypeDescr source_ev_type() const override { + return reflect::Reflect::require(); + } /*source_ev_type*/ + + virtual uint32_t n_out_ev() const override { return n_out_ev_; } + /* #of events queued for delivery */ + virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); } + + virtual bool debug_sim_flag() const override { return debug_sim_flag_; } + virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; } + + virtual CallbackId attach_sink(ref::rp const & sink) override { + ref::rp native_sink + = EventSink::require_native("SecondarySource::attach_sink", sink); + + if (native_sink) { + if (!this->is_volatile() + || native_sink->allow_volatile_source()) + { + return this->add_callback(native_sink); + } else { + throw std::runtime_error("SecondarySource::attach_sink" + ": sink requires non-volatile source " + + std::string(reflect::type_name())); + } + } else { + throw std::runtime_error("SecondarySource::attach_sink" + ": expected sink accepting " + + std::string(reflect::type_name())); + } + } /*attach_sink*/ + + virtual void detach_sink(CallbackId id) override { + this->remove_callback(id); + } /*detach_sink*/ + + // ----- Inherited from AbstractEventProcessor ----- + + virtual std::string const & name() const override { return name_; } + virtual void set_name(std::string const & x) override { this->name_ = x; } + + virtual void visit_direct_consumers(std::function ep)> const & fn) override { + + for(auto x : this->cb_set_) + fn(x.fn_.borrow()); + } /*visit_direct_consumers*/ + + private: + /* event book-keeping on receiving an event. + */ + bool preprocess_secondary_event(Event const & ev) + { + if (this->upstream_exhausted_) { + throw std::runtime_error("SecondarySource::notify_secondary_event" + ": not allowed after upstream exhausted"); + } + + utc_nanos evtm = this->reducer_.event_tm(ev); + + if (this->current_tm_ < evtm) + this->current_tm_ = evtm; + + /* if reducer is empty when .notify_event() begins, + * then reactor/simulator needs to be notified that source is no longer empty + */ + bool is_priming = this->reducer_.is_empty(); + + return is_priming; + } /*preprocess_secondary_event*/ + + /* event bookkeeping after receiving an event. + * + * Require: event has been propagated to .reducer + * + * is_priming. true if event causes source to + * become non-empty --> must notify reactor + */ + void postprocess_secondary_event(bool is_priming) { + using xo::scope; + using xo::xtag; + + Reactor * reactor = this->parent_reactor_; + + scope log(XO_DEBUG(this->debug_sim_flag_), + xtag("name", name_), + xtag("reactor", (void*)reactor), + xtag("is_priming", is_priming)); + + if (reactor) { + if (is_priming) { + /* reactor/simulator takes responsibility for delivering events */ + reactor->notify_source_primed(ref::brw::from_native(this)); + } + } else { + /* if no reactor, deliver immediately */ + this->deliver_one(); + } + } /*postprocess_secondary_event*/ + + /* deliver one event from reducer; + * invoke callback whenever replay_flag is true + */ + std::uint64_t deliver_one_aux(bool replay_flag) { + scope log(XO_DEBUG(this->debug_sim_flag_), + xtag("name", this->name_), + xtag("reducer.empty", this->reducer_.is_empty()), + xtag("replay_flag", replay_flag)); + + if (this->reducer_.is_empty()) + return 0; + + /* need to remove event _before_ invoking callbacks; + * callbacks may indirectly call this->notify_secondary_event(), + * modifiying .reducer + * + * reducer may use double-buffering scheme or similar to + * mitigate copying, esp when Event objects are heavy + */ + Event & ev = this->reducer_.annex_one(); + + /* if SecondarySource: + * Event ev = this->event_heap_.front(); + * std::pop_heap(this->event_heap_.begin(), + * this->event_heap_.end(), + * std::greater()); + * this->event_heap_.pop_back(); + */ + + if (replay_flag) { + ++(this->n_out_ev_); + this->cb_set_.invoke(&EventSink::notify_ev, ev); + } + + return 1; + } /*deliver_one_aux*/ + + private: + /* current time for this source */ + utc_nanos current_tm_; + + /* reporting name for this source (use when .debug_sim_flag set) + */ + std::string name_; + + /* if true, reactor/simulator to log interaction with this source + */ + bool debug_sim_flag_ = false; + + /* count lifetime #of outgoing events */ + uint32_t n_out_ev_ = 0; + + /* set this to true, once, to announce that upstream will send + * no more events. see .notify_upstream_exhausted() + */ + bool upstream_exhausted_ = false; + + /* events to be delivered to callbacks. + * multiple events may be collapsed depending on Reducer implementation + */ + Reducer reducer_; + + /* reactor/simulator being used to schedule consumption. if ommitted, + * will borrow thread calling .notify_secondary_event() + */ + Reactor * parent_reactor_ = nullptr; + + /* invoke callbacks in this set to send an outgoing event */ + RpCallbackSet cb_set_; + }; /*SecondarySource*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end SecondarySource.hpp */ diff --git a/include/xo/reactor/Sink.hpp b/include/xo/reactor/Sink.hpp new file mode 100644 index 00000000..2ba561c7 --- /dev/null +++ b/include/xo/reactor/Sink.hpp @@ -0,0 +1,222 @@ +/* @file Sink.hpp */ + +#pragma once + +#include "AbstractSink.hpp" +#include "AbstractSource.hpp" +#include "PolyAdapterSink.hpp" +#include "xo/reflect/Reflect.hpp" +#include "xo/indentlog/print/time.hpp" +#include "xo/indentlog/print/tag.hpp" +#include "xo/cxxutil/demangle.hpp" +#include + +namespace xo { + namespace reactor { + /* Sink for events of type T + * + * inheritance: + * ref::Refcount + * ^ + * isa + * | + * reactor::AbstractEventProcessor + * ^ + * isa + * | + * reactor::AbstractSink + * ^ + * isa + * | + * reactor::Sink1 + */ + template + class Sink1 : public AbstractSink { + public: + using Reflect = reflect::Reflect; + using TypeDescr = reflect::TypeDescr; + + public: + /* convenience: convert abstract sink to Sink1*, + * or throw + */ + static ref::rp> require_native(std::string_view caller, + ref::rp const & sink) + { + using xo::scope; + using xo::xtag; + + /* 1. if sink expects events of type T, + * make direct connection + */ + Sink1 * native_sink = nullptr; + + native_sink = dynamic_cast *>(sink.get()); + + if (native_sink) + return native_sink; + + /* 2. if sink is polymorphic, + * make type-erasing adapter + */ + + if (sink->allow_polymorphic_source()) { +#ifdef DEBUG_NOT_USING + scope lscope("Sink1::require_native: create PolyAdapterSink"); + lscope.log(xtag("caller", caller)); +#endif + + return PolyAdapterSink::make(sink); + } + + if (!native_sink) { +#ifdef DEBUG_EVENT_TYPEINFO + std::type_info const * sink_parent_typeinfo + = sink->parent_typeinfo(); +#endif + + std::size_t src_hashcode = typeid(T).hash_code(); + + throw std::runtime_error(tostr("Sink1::require_native" + ": wanted to sink S, but sink expects T", + xtag("caller", caller), + xtag("T", sink->sink_ev_type()->canonical_name()), + xtag("S", reflect::type_name()), + xtag("required_hashcode", typeid(Sink1).hash_code()), + xtag("required_name", typeid(Sink1).name()), + xtag("src_hashcode", src_hashcode), + xtag("sink_hashcode", sink->sink_ev_type()->typeinfo()->hash_code()) +#ifdef DEBUG_EVENT_TYPEINFO + , xtag("sink_hashcode", sink->item_typeinfo()->hash_code()) + , xtag("sink_parent_hashcode", sink_parent_typeinfo->hash_code()) + , xtag("sink_parent_name", sink_parent_typeinfo->name()) + , xtag("sink.type", sink->self_typename()) + , xtag("sink.parent_type", sink->parent_typename()) +#endif + )); + } + + return native_sink; + } /*require_native*/ + + virtual TypeDescr sink_ev_type() const override { return reflect::Reflect::require(); } + /* accept incoming event */ + virtual void notify_ev(T const & ev) = 0; + + /* invoke these when this sink added to, or removed from, a source */ + virtual void notify_add_callback() {} + virtual void notify_remove_callback() {} + + // ----- inherited from AbstractSink ----- + + /* Sink1 only allows source providing T */ + virtual bool allow_polymorphic_source() const override { return false; } + + virtual void attach_source(ref::rp const & src) override { + src->attach_sink(this); + } /*attach_source*/ + + virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { + using xo::xtag; + + T * p_ev = ev_tp.recover_native(); + + if (p_ev) { + this->notify_ev(*p_ev); + } else { + throw std::runtime_error(tostr("Sink1::notify_ev_tp" + ": unable to convert ev_tp to T", + xtag("ev_tp.type", ev_tp.td()->canonical_name()), + xtag("T", reflect::type_name()))); + } + } /*notify_ev_tp*/ + }; /*Sink1*/ + + /* a sink with no further downstream processors */ + template + class SinkEndpoint : public Sink1 { + public: + // ----- Inherited from AbstractEventProcessor ----- + + virtual std::string const & name() const override { return name_; } + virtual void set_name(std::string const & x) override { name_ = x; } + + virtual void visit_direct_consumers(std::function)> const &) override { + /* *this is not an event source */ + } /*visit_direct_consumers*/ + + private: + /* reporting name for this sink */ + std::string name_; + }; /*SinkEndpoint*/ + + template + class SinkToFunction : public SinkEndpoint { + public: + SinkToFunction(Fn fn) : fn_{std::move(fn)} {} + + /* NOTE: conservative choice here, could templatize on this */ + virtual bool allow_volatile_source() const override { return false; } + virtual uint32_t n_in_ev() const override { return n_in_ev_; } + virtual void notify_ev(T const & ev) override { + ++(this->n_in_ev_); + fn_(ev); + } /*notify_ev*/ + + virtual void display(std::ostream & os) const override { + using xo::xtag; + + os << "name()) + << xtag("n_in_ev", this->n_in_ev()) + << ">"; + } /*display*/ + + private: + Fn fn_; + /* counts lifetime #of incoming events (see .notify_ev()) */ + uint32_t n_in_ev_ = 0; + }; /*SinkToFunction*/ + + /* sink that prints to console */ + template + class SinkToConsole : public SinkEndpoint { + public: + SinkToConsole() {} + + virtual bool allow_volatile_source() const override { return true; } + virtual uint32_t n_in_ev() const override { return n_in_ev_; } + virtual void notify_ev(T const & ev) override { + //using logutil::operator<<; + + ++(this->n_in_ev_); + + std::cout << ev << std::endl; + } /*notify_ev*/ + + virtual void display(std::ostream & os) const override { + using xo::xtag; + + os << "name()) + << xtag("n_in_ev", this->n_in_ev()) + << ">"; + } /*display*/ + + private: + /* reporting name for this sink */ + std::string name_; + /* counts lifetime #of incoming events (see .notify_ev()) */ + uint32_t n_in_ev_ = 0; + }; /*SinkToConsole*/ + +#ifdef NOT_USING + class TemporaryTest { + public: + static ref::rp>> realization_printer(); + }; /*TemporaryTest*/ +#endif + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Sink.hpp */ diff --git a/include/xo/reactor/init_reactor.hpp b/include/xo/reactor/init_reactor.hpp new file mode 100644 index 00000000..5977d7ad --- /dev/null +++ b/include/xo/reactor/init_reactor.hpp @@ -0,0 +1,20 @@ +/* file init_reactor.hpp + * + * author: Roland Conybeare, Aug 2022 + */ + +#pragma once + +#include "xo/subsys/Subsystem.hpp" + +namespace xo { + enum S_reactor_tag {}; + + template<> + struct InitSubsys { + static void init(); + static InitEvidence require(); + }; +} /*namespace xo*/ + +/* end init_reactor.hpp */ diff --git a/src/reactor/AbstractEventProcessor.cpp b/src/reactor/AbstractEventProcessor.cpp new file mode 100644 index 00000000..e49cc374 --- /dev/null +++ b/src/reactor/AbstractEventProcessor.cpp @@ -0,0 +1,93 @@ +/* @file AbstractEventProcessor.cp */ + +#include "AbstractEventProcessor.hpp" +#include "xo/indentlog/print/tostr.hpp" +#include +#include + +namespace xo { + using ref::rp; + using ref::brw; + using xo::tostr; + using std::uint32_t; + + namespace reactor { + namespace { + /* search all event processors ep reachable (dowstream) from x, + * add to *m; + */ + void + map_network_helper(brw x, + uint32_t * tsort_ix, + std::unordered_map * m) + { + if (m->contains(x.get())) + return; + + auto fn = [tsort_ix, m] + (brw ep) + { + map_network_helper(ep, tsort_ix, m); + }; + + x->visit_direct_consumers(fn); + + /* postorder! */ + (*m)[x.get()] = ++(*tsort_ix); + + } /*map_network_helper*/ + } /*namespace*/ + + std::vector> + AbstractEventProcessor::map_network(rp const & x) + { + std::unordered_map network_map; + + /* index event processors in reverse topological order: + * if B is (directly or indirectly) downstream from A, + * then tsort_ix(B) < tsort_ix(A) + */ + uint32_t tsort_ix = 0; + + /* depth-first traversal, detect and short-circuit on dup paths */ + map_network_helper(x.borrow(), &tsort_ix, &network_map); + + /* invariant: tsort_ix = #of event processors in network */ + uint32_t n = tsort_ix; + + /* network_map, now in a topologically sorted order */ + std::map tsorted_map; + { + for(auto const & x : network_map) { + uint32_t tsort_ix = x.second; + AbstractEventProcessor * ep = x.first; + + tsorted_map[n - tsort_ix] = ep; + } + } + + std::vector> retval; + { + for(auto const & x : tsorted_map) + retval.push_back(x.second); + } + + return retval; + } /*map_network*/ + + void + AbstractEventProcessor::display(std::ostream & os) const + { + os << ""; + } /*display*/ + + std::string + AbstractEventProcessor::display_string() const + { + return tostr(*this); + } /*display_string*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end AbstractEventProcessor.cpp */ diff --git a/src/reactor/AbstractSource.cpp b/src/reactor/AbstractSource.cpp new file mode 100644 index 00000000..973ec4c6 --- /dev/null +++ b/src/reactor/AbstractSource.cpp @@ -0,0 +1,84 @@ +/* @file AbstractSource.cpp */ + +#include "AbstractSource.hpp" +#include "xo/indentlog/scope.hpp" +#include "xo/webutil/StreamEndpointDescr.hpp" +//#include "indentlog/scope.hpp" + +namespace xo { + using xo::web::StreamEndpointDescr; + using xo::reactor::AbstractSink; + using xo::ref::rp; + //using xo::scope; + //using xo::tostr; + + namespace reactor { + StreamEndpointDescr + AbstractSource::stream_endpoint_descr(std::string const & url_prefix) + { + auto subscribe_fn + = ([this] + (rp const & ws_sink) + { + //scope lscope("AbstractSource::stream_endpoint_descr.subscribe_fn"); + + /* ws_sink created by websocket, sends events to websocket as json + * see [websock/WebsocketSink] + */ + return this->attach_sink(ws_sink); + }); + + auto unsubscribe_fn + = ([this] + (CallbackId id) + { + this->detach_sink(id); + }); + + return StreamEndpointDescr(url_prefix, + subscribe_fn, + unsubscribe_fn); + } /*stream_endpoint_descr*/ + + uint64_t + AbstractSource::deliver_n(uint64_t n) + { + uint64_t retval = 0; + + for (uint64_t i=0; ideliver_one(); + + if (n1 == 0) { + /* short-circuit if source has less than n + * events available + */ + break; + } + + retval += n1; + } + + return retval; + } /*deliver_n*/ + + uint64_t + AbstractSource::deliver_all() + { + uint64_t retval = 0; + + for (;;) { + uint64_t n1 = this->deliver_one(); + + if (n1 == 0) + break; + + retval += n1; + } + + return retval; + } /*deliver_all*/ + + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end AbstractSource.cpp */ diff --git a/src/reactor/CMakeLists.txt b/src/reactor/CMakeLists.txt new file mode 100644 index 00000000..6e26ee50 --- /dev/null +++ b/src/reactor/CMakeLists.txt @@ -0,0 +1,21 @@ +# xo-reactor/src/reactor/CMakeLists.txt + +set(SELF_LIB reactor) +set(SELF_SRCS + AbstractEventProcessor.cpp AbstractSource.cpp ReactorSource.cpp + Sink.cpp + Reactor.cpp PollingReactor.cpp + init_reactor.cpp) + +xo_add_shared_library(${SELF_LIB} ${PROJECT_VERSION} 1 ${SELF_SRCS}) + +# ---------------------------------------------------------------- +# external dependencies + +# note: changes to xo_dependency() calls here +# must coordinate with find_dependency() calls +# in xo-reactor/cmake/reactorConfig.cmake.in +# +xo_dependency(${SELF_LIB} reflect) +xo_dependency(${SELF_LIB} webutil) +xo_dependency(${SELF_LIB} callback) diff --git a/src/reactor/PollingReactor.cpp b/src/reactor/PollingReactor.cpp new file mode 100644 index 00000000..348ca739 --- /dev/null +++ b/src/reactor/PollingReactor.cpp @@ -0,0 +1,88 @@ +/* @file PollingReactor.cpp */ + +#include "PollingReactor.hpp" + +namespace xo { + using ref::brw; + using std::size_t; + using std::uint64_t; + using std::int64_t; + + namespace reactor { + bool + PollingReactor::add_source(brw src) + { + /* make sure src does not already appear in .source_v[] */ + for(ReactorSourcePtr const & x : this->source_v_) { + if(x.get() == src.get()) { + throw std::runtime_error("PollingReactor::add_source; source already present"); + return false; + } + } + + src->notify_reactor_add(this); + + this->source_v_.push_back(src.get()); + + return true; + } /*add_source*/ + + bool + PollingReactor::remove_source(brw src) + { + auto ix = std::find(this->source_v_.begin(), + this->source_v_.end(), + src); + + if(ix != this->source_v_.end()) { + src->notify_reactor_remove(this); + + this->source_v_.erase(ix); + + return true; + } + + return false; + } /*remove_source*/ + + int64_t + PollingReactor::find_nonempty_source(size_t start_ix) + { + size_t z = this->source_v_.size(); + + /* search sources [ix .. z) */ + for(size_t ix = start_ix; ix < z; ++ix) { + brw src = this->source_v_[ix]; + + if(src->is_nonempty()) + return ix; + } + + /* search source [0 .. ix) */ + for(size_t ix = 0, n = std::min(start_ix, z); ix < n; ++ix) { + brw src = this->source_v_[ix]; + + if(src->is_nonempty()) + return ix; + } + + return -1; + } /*find_nonempty_source*/ + + uint64_t + PollingReactor::run_one() + { + int64_t ix = this->find_nonempty_source(this->next_ix_); + + if(ix >= 0) { + brw src = this->source_v_[ix]; + + return src->deliver_one(); + } else { + return 0; + } + } /*run_one*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end PollingReactor.cpp */ diff --git a/src/reactor/Reactor.cpp b/src/reactor/Reactor.cpp new file mode 100644 index 00000000..2ef80e46 --- /dev/null +++ b/src/reactor/Reactor.cpp @@ -0,0 +1,26 @@ +/* file Reactor.cpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#include "Reactor.hpp" + +namespace xo { + namespace reactor { + void + Reactor::run_n(int32_t n) + { + if (n == -1) { + for (;;) { + this->run_one(); + } + } else { + for (int32_t i=0; irun_one(); + } + } + } /*run_n*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Reactor.cpp */ diff --git a/src/reactor/ReactorSource.cpp b/src/reactor/ReactorSource.cpp new file mode 100644 index 00000000..0bb29397 --- /dev/null +++ b/src/reactor/ReactorSource.cpp @@ -0,0 +1,34 @@ +/* @file Source.cpp */ + +#include "ReactorSource.hpp" +#include "xo/indentlog/print/time.hpp" +#include + +namespace xo { + using xo::time::utc_nanos; + + namespace reactor { + utc_nanos + ReactorSource::online_current_tm() const + { + /* for an online source: + * .is_exhausted() must always be false; + * this implies that .sim_current_tm() should + * not be called in the first place + */ + + assert(false); + + return time::timeutil::epoch(); + } /*online_current_tm*/ + + std::uint64_t + ReactorSource::online_advance_until(utc_nanos /*tm*/, + bool /*replay_flag*/) + { + return 0; + } /*online_advance_until*/ + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Source.cpp */ diff --git a/src/reactor/Sink.cpp b/src/reactor/Sink.cpp new file mode 100644 index 00000000..1dbcdb62 --- /dev/null +++ b/src/reactor/Sink.cpp @@ -0,0 +1,18 @@ +/* @file Sink.cpp */ + +#include "Sink.hpp" +#include "xo/refcnt/Refcounted.hpp" + +namespace xo { + namespace reactor { +#ifdef NOT_USING + ref::rp>> + TemporaryTest::realization_printer() + { + return new SinkToConsole>(); + } /*realization_printer*/ +#endif + } /*namespace reactor*/ +} /*namespace xo*/ + +/* end Sink.cpp */ diff --git a/src/reactor/init_reactor.cpp b/src/reactor/init_reactor.cpp new file mode 100644 index 00000000..b33e4775 --- /dev/null +++ b/src/reactor/init_reactor.cpp @@ -0,0 +1,31 @@ +/* file init_reactor.cpp + * + * author: Roland Conybeare, Aug 2022 + */ + +#include "init_reactor.hpp" +#include "xo/reflect/init_reflect.hpp" + +namespace xo { + void + InitSubsys::init() + { + /* TODO: reflect reactor types */ + } /*init*/ + + InitEvidence + InitSubsys::require() + { + InitEvidence retval; + + /* subsystem dependencies for reactor/ */ + retval ^= InitSubsys::require(); + + /* reactor/'s own initialization code */ + retval ^= Subsystem::provide("reactor", &init); + + return retval; + } /*require*/ +} /*namespace xo*/ + +/* end init_reactor.cpp */