diff --git a/.xo-reactor/.github/workflows/main.yml b/.xo-reactor/.github/workflows/main.yml deleted file mode 100644 index 7efc00fe..00000000 --- a/.xo-reactor/.github/workflows/main.yml +++ /dev/null @@ -1,231 +0,0 @@ -name: build xo-reactor + xo dependencies - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -env: - # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) - BUILD_TYPE: Release - -jobs: - build: - # The CMake configure and build commands are platform agnostic and should work equally well on Windows or Mac. - # You can convert this to a matrix build if you need cross-platform coverage. - # See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix - runs-on: ubuntu-latest - - steps: - - name: checkout source - uses: actions/checkout@v3 - - - name: Install catch2 - # install catch2. see [[https://stackoverflow.com/questions/57982945/how-to-apt-get-install-in-a-github-actions-workflow]] - run: sudo apt-get install -y catch2 - - # ---------------------------------------------------------------- - - - name: Clone xo-cmake - uses: actions/checkout@v3 - with: - repository: Rconybea/xo-cmake - path: repo/xo-cmake - - - name: Configure xo-cmake - run: cmake -B ${{github.workspace}}/build_xo-cmake -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/xo-cmake - - - name: Build xo-cmake (trivial) - run: cmake --build ${{github.workspace}}/build_xo-cmake --config ${{env.BUILD_TYPE}} - - - name: Install xo-cmake - run: cmake --install ${{github.workspace}}/build_xo-cmake - - # ---------------------------------------------------------------- - - - name: Clone indentlog - uses: actions/checkout@v3 - with: - repository: Rconybea/indentlog - path: repo/indentlog - - - name: Configure indentlog - # configure cmake for indentlog in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_indentlog -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/indentlog - - - name: Build indentlog - run: cmake --build ${{github.workspace}}/build_indentlog --config ${{env.BUILD_TYPE}} - - - name: Install indentlog - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_indentlog - - # ---------------------------------------------------------------- - - - name: Clone refcnt - uses: actions/checkout@v3 - with: - repository: Rconybea/refcnt - path: repo/refcnt - - - name: Configure refcnt - # configure cmake for refcnt in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_refcnt -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/refcnt - - - name: Build refcnt - run: cmake --build ${{github.workspace}}/build_refcnt --config ${{env.BUILD_TYPE}} - - - name: Install refcnt - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_refcnt - - # ---------------------------------------------------------------- - - - name: Clone subsys - uses: actions/checkout@v3 - with: - repository: Rconybea/subsys - path: repo/subsys - - - name: Configure subsys - # configure cmake for subsys in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_subsys -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/subsys - - - name: Build subsys - run: cmake --build ${{github.workspace}}/build_subsys --config ${{env.BUILD_TYPE}} - - - name: Install subsys - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_subsys - - # ---------------------------------------------------------------- - - - name: Clone reflect - uses: actions/checkout@v3 - with: - repository: Rconybea/reflect - path: repo/reflect - - - name: Configure reflect - # configure cmake for reflect in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_reflect -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/reflect - - - name: Build reflect - run: cmake --build ${{github.workspace}}/build_reflect --config ${{env.BUILD_TYPE}} - - - name: Install reflect - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_reflect - - # ---------------------------------------------------------------- - - - name: Clone callback - uses: actions/checkout@v3 - with: - repository: Rconybea/xo-callback - path: repo/callback - - - name: Configure callback - # configure cmake for callback in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_callback -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/callback - - - name: Build callback - run: cmake --build ${{github.workspace}}/build_callback --config ${{env.BUILD_TYPE}} - - - name: Install callback - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_callback - - # ---------------------------------------------------------------- - - - name: Clone webutil - uses: actions/checkout@v3 - with: - repository: Rconybea/xo-webutil - path: repo/webutil - - - name: Configure webutil - # configure cmake for webutil in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_webutil -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/webutil - - - name: Build webutil - run: cmake --build ${{github.workspace}}/build_webutil --config ${{env.BUILD_TYPE}} - - - name: Install webutil - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_webutil - - # ---------------------------------------------------------------- - - - name: Clone printjson - uses: actions/checkout@v3 - with: - repository: Rconybea/xo-printjson - path: repo/printjson - - - name: Configure printjson - # configure cmake for printjson in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_printjson -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/printjson - - - name: Build printjson - run: cmake --build ${{github.workspace}}/build_printjson --config ${{env.BUILD_TYPE}} - - - name: Install printjson - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_printjson - - # ---------------------------------------------------------------- - - - name: Clone randomgen - uses: actions/checkout@v3 - with: - repository: Rconybea/randomgen - path: repo/randomgen - - - name: Configure randomgen - # configure cmake for randomgen in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_randomgen -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/randomgen - - - name: Build randomgen - run: cmake --build ${{github.workspace}}/build_randomgen --config ${{env.BUILD_TYPE}} - - - name: Install randomgen - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_randomgen - - # ---------------------------------------------------------------- - - - name: Clone ordinaltree - uses: actions/checkout@v3 - with: - repository: Rconybea/xo-ordinaltree - path: repo/ordinaltree - - - name: Configure ordinaltree - # configure cmake for ordinaltree in dedicated build directory. - run: cmake -B ${{github.workspace}}/build_ordinaltree -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/ordinaltree - - - name: Build ordinaltree - run: cmake --build ${{github.workspace}}/build_ordinaltree --config ${{env.BUILD_TYPE}} - - - name: Install ordinaltree - # install into ${{github.workspace}}/local - run: cmake --install ${{github.workspace}}/build_ordinaltree - - # ---------------------------------------------------------------- - - - name: Configure self (reactor) - # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. - # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type - run: cmake -B ${{github.workspace}}/build_reactor -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} - - - name: Build self (reactor) - # Build your program with the given configuration - run: cmake --build ${{github.workspace}}/build_reactor --config ${{env.BUILD_TYPE}} - - - name: Test self (reactor) - working-directory: ${{github.workspace}}/build_reactor - # Execute tests defined by the CMake configuration. - # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest -C ${{env.BUILD_TYPE}} diff --git a/.xo-reactor/.gitignore b/.xo-reactor/.gitignore deleted file mode 100644 index f57a21b4..00000000 --- a/.xo-reactor/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -# lsp keep state here -.cache -# typical build directories -.build* -# for lsp: manual symlink to chosen build directory -compile_commands.json diff --git a/.xo-reactor/CMakeLists.txt b/.xo-reactor/CMakeLists.txt deleted file mode 100644 index 0f8848ab..00000000 --- a/.xo-reactor/CMakeLists.txt +++ /dev/null @@ -1,30 +0,0 @@ -# xo-reactor/CMakeLists.txt - -cmake_minimum_required(VERSION 3.10) - -project(reactor VERSION 0.1) - -include(GNUInstallDirs) -include(cmake/xo-bootstrap-macros.cmake) - -xo_cxx_toplevel_options3() - -# ---------------------------------------------------------------- -# c++ settings - -# one-time project-specific c++ flags. usually empty -set(PROJECT_CXX_FLAGS "") -#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2") -add_definitions(${PROJECT_CXX_FLAGS}) - -# ---------------------------------------------------------------- - -add_subdirectory(src/reactor) -add_subdirectory(utest) - -# ---------------------------------------------------------------- -# provide find_package() support for reactor customers - -xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets) - -# end CMakeLists.txt diff --git a/.xo-reactor/README.md b/.xo-reactor/README.md deleted file mode 100644 index 8136c1da..00000000 --- a/.xo-reactor/README.md +++ /dev/null @@ -1,56 +0,0 @@ -# reactor library - -in-memory queuing system - -## Getting Started - -### build + install dependencies - -build+install these first - -- xo-reflect [github.com/Rconybea/xo-reflect](https://github.com/Rconybea/reflect) -- xo-callback [github.com/Rconybea/xo-callback](https://github.com/Rconybea/xo-callback) -- xo-webutil [github.com/Rconybea/xo-webutil](https://github.com/Rconybea/xo-webutil) - -### build + install xo-reactor -``` -$ cd xo-reactor -$ mkdir build -$ cd build -$ INSTALL_PREFIX=/usr/local # or wherever you prefer -$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} .. -$ make -$ make install -``` -(also see .github/workflows/main.yml) - -## Development - -### build for unit test coverage -``` -$ cd xo-reactor -$ mkdir ccov -$ cd ccov -$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCODE_COVERAGE=ON -DCMAKE_BUILD_TYPE=Debug .. -``` - -### LSP support - -LSP looks for compile commands in the root of the source tree; -cmake creates them in the root of its build directory. - -``` -$ cd xo-reactor -$ ln -s build/compile_commands.json -``` - -### display cmake variables - -- `-L` list variables -- `-A` include 'advanced' variables -- `-H` include help text - -``` -$ cd xo-reactor/build -$ cmake -LAH -``` diff --git a/.xo-reactor/cmake/reactorConfig.cmake.in b/.xo-reactor/cmake/reactorConfig.cmake.in deleted file mode 100644 index 6079b76a..00000000 --- a/.xo-reactor/cmake/reactorConfig.cmake.in +++ /dev/null @@ -1,17 +0,0 @@ -@PACKAGE_INIT@ - -include(CMakeFindDependencyMacro) - -# note: changes to find_dependency() calls here -# must coordinate with xo_dependency() calls -# in xo-reactor/src/reactor/CMakeLists.txt -# -find_dependency(xo_ordinaltree) -find_dependency(reflect) -find_dependency(webutil) -find_dependency(printjson) -find_dependency(callback) - -include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake") -include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Share.cmake") -check_required_components("@PROJECT_NAME@") diff --git a/.xo-reactor/cmake/xo-bootstrap-macros.cmake b/.xo-reactor/cmake/xo-bootstrap-macros.cmake deleted file mode 100644 index 592272c0..00000000 --- a/.xo-reactor/cmake/xo-bootstrap-macros.cmake +++ /dev/null @@ -1,41 +0,0 @@ -# ---------------------------------------------------------------- -# for example: -# $ PREFIX=/usr/local # for example -# $ cmake -DCMAKE_MODULE_PATH=prefix -DCMAKE_INSTALL_PREFIX=$PREFIX -B .build -# -# will get -# CMAKE_MODULE_PATH -# from xo-cmake-config --cmake-module-path -# -# and expect .cmake macros in -# CMAKE_MODULE_PATH/xo_macros/xo_cxx.cmake -# ---------------------------------------------------------------- - -find_program(XO_CMAKE_CONFIG_EXECUTABLE NAMES xo-cmake-config REQUIRED) - -if ("${XO_CMAKE_CONFIG_EXECUTABLE}" STREQUAL "XO_CMAKE_CONFIG_EXECUTABLE-NOT_FOUND") - message(FATAL "could not find xo-cmake-config executable") -endif() - -message(STATUS "XO_CMAKE_CONFIG_EXECUTABLE=${XO_CMAKE_CONFIG_EXECUTABLE}") - -if (XO_SUBMODULE_BUILD) - if (("${CMAKE_MODULE_PATH}" STREQUAL "") OR ("${CMAKE_MODULE_PATH}" STREQUAL prefix)) - # local version of xo-cmake macros - set(CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/xo-cmake/cmake") - message(STATUS "CMAKE_MODULE_PATH=${CMAKE_MODULE_PATH}") - endif() -else() - if (("${CMAKE_MODULE_PATH}" STREQUAL "") OR ("${CMAKE_MODULE_PATH}" STREQUAL prefix)) - # default to typical install location for xo-project-macros - execute_process(COMMAND ${XO_CMAKE_CONFIG_EXECUTABLE} --cmake-module-path OUTPUT_VARIABLE CMAKE_MODULE_PATH) - message(STATUS "CMAKE_MODULE_PATH=${CMAKE_MODULE_PATH}") - endif() -endif() - -# needs to have been installed somewhere on CMAKE_MODULE_PATH, -# (e.g. from xo-cmake with the same value for CMAKE_INSTALL_PREFIX) -# -include(xo_macros/xo_cxx) - -xo_cxx_bootstrap_message() diff --git a/.xo-reactor/include/xo/reactor/AbstractEventProcessor.hpp b/.xo-reactor/include/xo/reactor/AbstractEventProcessor.hpp deleted file mode 100644 index e955a40c..00000000 --- a/.xo-reactor/include/xo/reactor/AbstractEventProcessor.hpp +++ /dev/null @@ -1,50 +0,0 @@ -/* @file AbstractEventProcessor.hpp */ - -#pragma once - -#include "xo/refcnt/Refcounted.hpp" -#include -#include -#include - -namespace xo { - namespace reactor { - /* common base class for {AbstractSource, AbstractSink}. - * An event processor can be: - * 1. an event source (inherits AbstractSource) - * 2. an event sink (inherits AbstractSink) - * 3. both source+sink (inherits both) - */ - class AbstractEventProcessor : virtual public ref::Refcount { - public: - /* reporting name for this source. ideally unique, but not required */ - virtual std::string const & name() const = 0; - /* set .name */ - virtual void set_name(std::string const & x) = 0; - - /* find all event processors ep reachable from x (i.e. downstream from x). - * report each such ep exactly once - */ - static std::vector> map_network(rp const & x); - - /* visit direct downstream consumers c[i] of this event processor. - * call ep(c[i]) for each such consumer. - */ - virtual void visit_direct_consumers(std::function ep)> const & fn) = 0; - - /* write representation to stream */ - virtual void display(std::ostream & os) const = 0; - /* human-readable string identifying this source */ - virtual std::string display_string() const; - }; /*AbstractEventProcessor*/ - - inline std::ostream & - operator<<(std::ostream & os, AbstractEventProcessor const & src) { - src.display(os); - return os; - } /*operator<<*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end AbstractEventProcessor.hpp */ diff --git a/.xo-reactor/include/xo/reactor/AbstractSink.hpp b/.xo-reactor/include/xo/reactor/AbstractSink.hpp deleted file mode 100644 index 46e2bf0b..00000000 --- a/.xo-reactor/include/xo/reactor/AbstractSink.hpp +++ /dev/null @@ -1,71 +0,0 @@ -/* @file AbstractSink.hpp */ - -#pragma once - -#include "AbstractSource.hpp" -#include "xo/reflect/TaggedPtr.hpp" -#include "xo/reflect/TypeDescr.hpp" -//#include "time/Time.hpp" -#include "xo/indentlog/print/tag.hpp" -#include "xo/cxxutil/demangle.hpp" -#include - -namespace xo { - namespace reactor { - /* an event consumer. - * note that event representation is not specified, - * this helps avoid mandating a type hierarchy for events - */ - class AbstractSink : public virtual AbstractEventProcessor { - public: - using TypeDescr = reflect::TypeDescr; - using TaggedPtr = reflect::TaggedPtr; - - public: - virtual ~AbstractSink() = default; - - /* if true: sources may produce events of any reflected type. - * sink will accept such events using .notify_ev_tp() - * for example see web_util/WebsocketSink - * - * if false (common): souce is expected to to produce events of - * a single type, specified by .sink_ev_type() - * .notify_ev_tp() will downcast to that type. - * for example see reactor/Sink1 - * - * polymorphic sinks pay for runtime polymorphism - * (since WebsocketSink sends events in json format this is - * expected to be negligible compared to message formatting) - */ - virtual bool allow_polymorphic_source() const = 0; - - /* identify datatype for items expected by this sink */ - virtual TypeDescr sink_ev_type() const = 0; - - /* true iff this sink accepts volatile events. - * volatile events are events that may be modified - * or destroyed after being delivered to this sink. - * - * For example KalmanFilterSvc accepts volatile events, - * but EventStore requires non-volatile events. - */ - virtual bool allow_volatile_source() const = 0; - - /* counts lifetime #of incoming events for this sink */ - virtual uint32_t n_in_ev() const = 0; - - /* attach an input source. - * typically this means calling src.add_callback() - * with a function thats calls a .notify_xxx() method - * on this Sink - */ - virtual void attach_source(rp const & src) = 0; - - /* accept incoming event, given by tagged pointer */ - virtual void notify_ev_tp(TaggedPtr const & ev_tp) = 0; - }; /*AbstractSink*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end AbstractSink.hpp */ diff --git a/.xo-reactor/include/xo/reactor/AbstractSource.hpp b/.xo-reactor/include/xo/reactor/AbstractSource.hpp deleted file mode 100644 index d98938ed..00000000 --- a/.xo-reactor/include/xo/reactor/AbstractSource.hpp +++ /dev/null @@ -1,100 +0,0 @@ -/* @file AbstractSource.hpp */ - -#pragma once - -#include "AbstractEventProcessor.hpp" -#include "xo/reflect/TypeDescr.hpp" -#include "xo/callback/CallbackSet.hpp" -#include "xo/refcnt/Refcounted.hpp" -#include - -namespace xo { - namespace web { class StreamEndpointDescr; } - - namespace reactor { - class AbstractSink; - - template - class Sink1; - - /* abstract api for a source of events. - * Event representation is left open: Sources and Sinks - * need to have compatible event representations, - * and coordination is left to such (Source, Sink) pairs. - * - * See ReactorSource, for example - * - * Typically a Source will have one or more .add_callback() - * methods, for listening to source events - */ - class AbstractSource : public virtual AbstractEventProcessor { - public: - using StreamEndpointDescr = web::StreamEndpointDescr; - using TypeDescr = reflect::TypeDescr; - using CallbackId = fn::CallbackId; - - public: - /* identify datatype for items delivered by this source */ - virtual TypeDescr source_ev_type() const = 0; - - /* if true: event objects (see .source_ev_type()) - * may be overwritten between callbacks. - * A sink that wants to capture events - * (e.g. EventStore<>) will need to deep-copy them - * if false: event objects are preserved between callbacks. - * - * A source that stores events received from elsewhere (e.g. FifoQueue) - * is probably volatile. - * - * A source that remembers (in explicit memory) every event it produces - * is not volatile - */ - virtual bool is_volatile() const = 0; - - /* counts #of outbound events ready for delivery, - * but not yet sent */ - virtual uint32_t n_queued_out_ev() const = 0; - /* counts lifetime #of events delivered. - * see also AbstractSink.n_in_ev - */ - virtual uint32_t n_out_ev() const = 0; - - /* if true, simulator will report interaction with this source */ - virtual bool debug_sim_flag() const = 0; - /* set .trace_sim_flag */ - virtual void set_debug_sim_flag(bool x) = 0; - - virtual CallbackId attach_sink(rp const & sink) = 0; - virtual void detach_sink(CallbackId id) = 0; - - /* endpoint for a websocket subscriber; - * subscriber delivers events produced by this source - */ - StreamEndpointDescr stream_endpoint_descr(std::string const & url_prefix); - - /* typically expect events to be delivered using a reactor or simulator. - * (for example see reactor/Reactor, simulator/Simulator); - * reactor allocates cpu, and controls event ordering across sources - * when there are multiple sources. - * - * However, also possible for user code to invoke .deliver_one() directly. - * Beware, may get unpredictable results if attempt to do this on a source - * that's also attached to a reactor. - */ - virtual std::uint64_t deliver_one() = 0; - - /* convenience: call .deliver_one() n times, return sum of results */ - std::uint64_t deliver_n(uint64_t n); - - /* convenience: call .deliver_one() until it returns 0 - * (beware of inexhaustible sources!) - */ - std::uint64_t deliver_all(); - }; /*AbstractSource*/ - - using AbstractSourcePtr = rp; - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end AbstractSource.hpp */ diff --git a/.xo-reactor/include/xo/reactor/DirectSource.hpp b/.xo-reactor/include/xo/reactor/DirectSource.hpp deleted file mode 100644 index 7187e56d..00000000 --- a/.xo-reactor/include/xo/reactor/DirectSource.hpp +++ /dev/null @@ -1,18 +0,0 @@ -/* @file DirectSource.hpp */ - -#pragma once - -//#include "time/Time.hpp" -#include "reactor/Sink.hpp" -#include "reactor/EventSource.hpp" -#include "reactor/HeapReducer.hpp" -//#include "reactor/LastReducer.hpp" -#include "reactor/Reactor.hpp" -#include "callback/CallbackSet.hpp" - -namespace xo { - namespace reactor { - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end DirectSource.hpp */ diff --git a/.xo-reactor/include/xo/reactor/DirectSourcePtr.hpp b/.xo-reactor/include/xo/reactor/DirectSourcePtr.hpp deleted file mode 100644 index f68eb27c..00000000 --- a/.xo-reactor/include/xo/reactor/DirectSourcePtr.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/* @file DirectSourcePtr.hpp */ - -#pragma once - -#include "SecondarySource.hpp" -#include "LastReducer.hpp" -#include "EventTimeFn.hpp" - -namespace xo { - namespace reactor { - template - using DirectSource = SecondarySource>>; - - /* use when Event is rp for some T */ - template - using DirectSourcePtr = SecondarySource>>; - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end DirectSourcePtr.hpp */ diff --git a/.xo-reactor/include/xo/reactor/EventSource.hpp b/.xo-reactor/include/xo/reactor/EventSource.hpp deleted file mode 100644 index bb08eca9..00000000 --- a/.xo-reactor/include/xo/reactor/EventSource.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/* @file EventSource.hpp */ - -#pragma once - -#include "ReactorSource.hpp" -#include "xo/callback/CallbackSet.hpp" - -namespace xo { - namespace reactor { - template - class EventSource : public ReactorSource { - public: - using CallbackId = fn::CallbackId; - - public: - virtual CallbackId add_callback(rp const & cb) = 0; - virtual void remove_callback(CallbackId id) = 0; - }; /*EventSource*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end EventSource.hpp */ diff --git a/.xo-reactor/include/xo/reactor/EventStore.hpp b/.xo-reactor/include/xo/reactor/EventStore.hpp deleted file mode 100644 index 5c0d1630..00000000 --- a/.xo-reactor/include/xo/reactor/EventStore.hpp +++ /dev/null @@ -1,318 +0,0 @@ -/* @file EventStore.hpp */ - -#pragma once - -#include "Reducer.hpp" -#include "EventTimeFn.hpp" -#include "Sink.hpp" -#include "xo/webutil/HttpEndpointDescr.hpp" -#include "xo/printjson/PrintJson.hpp" -#include "xo/reflect/Reflect.hpp" -#include "xo/ordinaltree/RedBlackTree.hpp" -#include "xo/ordinaltree/rbtree/OrdinalReduce.hpp" - -namespace xo { - namespace reactor { - - /* abstract event store api */ - class AbstractEventStore : virtual public ref::Refcount { - public: - using PrintJson = xo::json::PrintJson; - using TaggedPtr = xo::reflect::TaggedPtr; - using HttpEndpointDescr = xo::web::HttpEndpointDescr; - using Alist = xo::web::Alist; - - public: - /* true iff .size() == 0 */ - virtual bool empty() const = 0; - - /* #of events currently held in this store */ - virtual std::uint32_t size() const = 0; - - /* TODO: - * 1. TaggedGptr = discriminated union of - * a. TaggedRcptr (i.e. refcounted semantics) - * b. Unique (i.e. unique_ptr semantics) - * c. Exptr (i.e. unowned_ptr semantics) - * d. compact (special-case -- value fits in pointer) - * will need mpl copy/assign stuff for TaggedUnique - * 2. provide .last_n(), .last_dt() - */ - - virtual void http_snapshot(rp const & pjson, - std::ostream * p_os) const = 0; - - /* http endpoint; generates http output for this eventstore */ - virtual HttpEndpointDescr http_endpoint_descr(rp const & pjson, - std::string const & url_prefix) const { - - /* important that lambda contains its own rp; - * reference to stack will not do - */ - rp pjson_rp = pjson; - - auto http_fn = ([this, pjson_rp] - (std::string const & /*uri*/, - Alist const & /*alist*/, - std::ostream * p_os) - { - /* WARNING: race condition here, - * given webserver runs from a separate thread - */ - - this->http_snapshot(pjson_rp, p_os); - }); - - return HttpEndpointDescr(url_prefix + "/snap", http_fn); - } /*http_endpoint_descr*/ - - virtual void clear() = 0; - - virtual void insert_tp(TaggedPtr const & ev_tp) = 0; - }; /*AbstractEventStore*/ - - /* in-memory storage for a set of events. - * - * Require: - * - Event is null-constructible - * - Event is copyable - * - EventTimeFn :: Event -> utc_nanos - * - * inheritance - * ref::Refcount - * ^ - * isa - * | - * reactor::AbstractEventProcessor + req .visit_direct_consumers() - * ^ - * isa - * | - * reactor::AbstractSink + req .sink_ev_type(), .notify_ev() etc. - * ^ - * isa - * | - * reactor::Sink1 + .attach_source(), .sink_ev_type(), - * ^ req .notify_ev() etc - * | - * isa - * | - * reactor::SinkEndpoint + impl .visit_direct_consumers() - * ^ - * isa - * | - * reactor::StructEventStore + .last_n() .last_dt() etc. - */ - template - class EventStoreImpl : public SinkEndpoint, - public AbstractEventStore, - ReducerBase - { - static_assert(EventTimeConcept); - - public: - using utc_nanos = xo::time::utc_nanos; - using nanos = xo::time::nanos; - using EventTree = xo::tree::RedBlackTree>; - using PrintJson = xo::json::PrintJson; - using Alist = xo::web::Alist; - using HttpEndpointDescr = xo::web::HttpEndpointDescr; - - static rp make() { return new EventStoreImpl(); } - - /* visit most recent n events in this store. - * returns #of events actually visited - * - * if events visited are e1 .. en, then: - * (1) en is the most recent recorded event - * (.event_tm(en) is .tree.max_key()) - * (2) there are no events between e(i) and e(i+1) - * (i.e. visit does not skip over any events) - * (3) if v < n, then v = .size(), - * where v is the #of events visited - * - * require: - * - Fn :: (Event -> ) - */ - template - std::uint32_t visit_last_n(std::uint32_t n, Fn && fn) const { - std::uint32_t z = this->size(); - std::uint32_t lo = ((n >= z) ? 0 : z - n); - - typename EventTree::const_iterator lo_ix = this->tree_.find_ith(lo); - typename EventTree::const_iterator hi_ix = this->tree_.cend(); - - return this->visit_range(lo_ix, hi_ix, fn); - } /*visit_last_n*/ - - /* visit suffix of events sufficient to cover interval of length dt. - * visit events in increasing timestamp order. - * - * if events visited are e1 .. en, then: - * (1) en is the most recent recorded event - * (.event_tm(en) is .tree.max_key()) - * (2) there are no events between e(i) and e(i+1) - * (i.e. visit does not skip over any events) - * (3) if .event_tm(en) - .event_tm(e1) < dt, - * then e1 is the earliest recorded event - * (.event_tm(e1) is .tree.min_key()) - * (4) if .event_tm(en) - .event_tm(e1) > dt, - * then (.event_tm(en) - .event_tm(e2)) < dt - * - * |<---------- dt ----------->| - * ^ ^ ^ - * e1 e2 en - */ - template - std::uint32_t visit_last_dt(nanos dt, Fn && fn) const { - if (tree_.empty()) - return 0; - - /* tree not empty -> has max key */ - utc_nanos tn = this->tree_.max_key(); - utc_nanos tk = tn - dt; - - typename EventTree::const_iterator lo_ix = this->tree_.find_glb(tk, true /*closed*/); - typename EventTree::const_iterator hi_ix = this->tree_.end(); - - return this->visit_range(lo_ix, hi_ix, fn); - } /*visit_last_dt*/ - - std::vector last_n(std::uint32_t n) const { - std::vector retval; - - auto fn = [&retval](Event const &ev) { retval.push_back(ev); }; - - this->visit_last_n(n, fn); - - return retval; - } /*last_n*/ - - std::vector last_dt(nanos dt) const { - std::vector retval; - - auto fn = [&retval](Event const &ev) { retval.push_back(ev); }; - - this->visit_last_dt(dt, fn); - - return retval; - } /*last_dt*/ - - void insert(Event const & ev) { this->tree_.insert(typename EventTree::value_type(this->event_tm(ev), ev)); } - - // ----- Inherited from AbstractEventStore ----- - - virtual bool empty() const override { return tree_.empty(); } - virtual std::uint32_t size() const override { return tree_.size(); } - - /* write http snapshot of current state to *p_os */ - virtual void http_snapshot(rp const & pjson, std::ostream * p_os) const override { - using xo::reflect::Reflect; - - /* visit last 100 events; - * write them to *p_os in increasing time order - */ - auto ev_v = this->last_n(100); - - pjson->print_tp(Reflect::make_tp(&ev_v), p_os); - } /*http_snapshot*/ - - virtual void clear() override { this->tree_.clear(); } - - virtual void insert_tp(TaggedPtr const & ev_tp) override { - using xo::xtag; - - Event * p_ev = ev_tp.recover_native(); - - if (p_ev) { - this->insert(*p_ev); - } else { - throw std::runtime_error(tostr("StructEventStore::insert_tp" - ": unable to convert ev_tp to Event", - xtag("ev_tp.type", ev_tp.td()->canonical_name()), - xtag("Event", reflect::type_name()))); - } - } /*insert_tp*/ - - // ----- Inherited from AbstractSink ----- - - virtual uint32_t n_in_ev() const override { return n_in_ev_; } - virtual bool allow_volatile_source() const override { return false; } - virtual void notify_ev(Event const & ev) override { - ++(this->n_in_ev_); - this->insert(ev); - } - - // ----- Inherited from AbstractSource ----- - - virtual void display(std::ostream & os) const override { - using xo::xtag; - - os << "name()) - << xtag("n_in_ev", this->n_in_ev()) - << ">"; - } /*display*/ - - // ----- Inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return name_; } - virtual void set_name(std::string const & x) override { name_ = x; } - - private: - EventStoreImpl() = default; - - template - std::uint32_t visit_range(typename EventTree::const_iterator lo_ix, - typename EventTree::const_iterator hi_ix, - Fn && fn) const { - std::uint32_t n = 0; - for (; lo_ix != hi_ix; ++lo_ix, ++n) { - fn(lo_ix->second); - } - - return n; - } /*visit_range*/ - - private: - /* reporting name for this store */ - std::string name_; - /* fetches per-event timestamp */ - EventTimeFn event_tm_fn_; - /* counts lifetime #of incoming events (see .notify_ev()) */ - uint32_t n_in_ev_ = 0; - /* events stored here */ - EventTree tree_; - }; /*EventStoreImpl*/ - - template - using StructEventStore = EventStoreImpl>; - - template - using PtrEventStore = EventStoreImpl>; - - /* Require: - * EventTimeConcept> - */ - template - class SinkToEventStore : public SinkEndpoint { - public: - using EventStore = StructEventStore; - - public: - SinkToEventStore() = default; - - virtual void notify_ev(T const & ev) override { - store_.insert(ev); - } /*notify_ev*/ - - private: - /* stash remembered events (all of them!) here */ - EventStore store_; - }; /*SinkToEventStore*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end EventStore.hpp */ diff --git a/.xo-reactor/include/xo/reactor/EventTimeFn.hpp b/.xo-reactor/include/xo/reactor/EventTimeFn.hpp deleted file mode 100644 index ee206d11..00000000 --- a/.xo-reactor/include/xo/reactor/EventTimeFn.hpp +++ /dev/null @@ -1,48 +0,0 @@ -/* @file EventTimeFn.hpp */ - -#pragma once - -//#include "time/Time.hpp" -#include "xo/indentlog/timeutil/timeutil.hpp" -#include - -namespace xo { - namespace reactor { - template - concept EventTimeConcept = requires(EventTimeFn etfn, Event ev) { - { etfn(ev) } -> std::same_as; - }; - - template - class StructEventTimeFn { - public: - using event_t = Event; - using utc_nanos = xo::time::utc_nanos; - - public: - utc_nanos operator()(Event const & ev) const { return ev.tm(); } - }; /*StructEventTimeFn*/ - - template - class PtrEventTimeFn { - public: - using event_t = Event; - using utc_nanos = xo::time::utc_nanos; - - public: - utc_nanos operator()(Event const & ev) const { return ev->tm(); } - }; /*PtrEventTimeFn*/ - - template - class PairEventTimeFn { - public: - using utc_nanos = xo::time::utc_nanos; - using event_t = std::pair; - - public: - utc_nanos operator()(event_t const & ev) const { return ev.first; } - }; /*PairEventTimeFn*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end EventTimeFn.hpp */ diff --git a/.xo-reactor/include/xo/reactor/EventTimeFn2.hpp b/.xo-reactor/include/xo/reactor/EventTimeFn2.hpp deleted file mode 100644 index 9f68eb00..00000000 --- a/.xo-reactor/include/xo/reactor/EventTimeFn2.hpp +++ /dev/null @@ -1,60 +0,0 @@ -/* @file EventTimeFn2.hpp */ - -#pragma once - -#include "xo/refcnt/Refcounted.hpp" -#include - -namespace xo { - namespace reactor { - template - class EventTimeFn { - public: - using utc_nanos = xo::time::utc_nanos; - using event_t = Event; - - public: - static utc_nanos event_tm(event_t const & ev) { return ev.tm(); } - - utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); } - }; - - template - class EventTimeFn> { - public: - using utc_nanos = xo::time::utc_nanos; - using event_t = xo::rp; - - public: - static utc_nanos event_tm(event_t const & ev) { return ev->tm(); } - - utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); } - }; - - template - class EventTimeFn { - public: - using utc_nanos = xo::time::utc_nanos; - using event_t = T*; - - public: - static utc_nanos event_tm(event_t ev) { return ev->tm(); } - - utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); } - }; - - template - class EventTimeFn> { - public: - using utc_nanos = xo::time::utc_nanos; - using event_t = std::pair; - - public: - static utc_nanos event_tm(event_t const & ev) { return ev.first; } - - utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); } - }; - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end EventTimeFn2.hpp */ diff --git a/.xo-reactor/include/xo/reactor/FifoQueue.hpp b/.xo-reactor/include/xo/reactor/FifoQueue.hpp deleted file mode 100644 index 8f0b48a4..00000000 --- a/.xo-reactor/include/xo/reactor/FifoQueue.hpp +++ /dev/null @@ -1,267 +0,0 @@ -/* @file FifoQueue.hpp */ - -#pragma once - -#include "Reactor.hpp" -#include "EventSource.hpp" -#include "Sink.hpp" -#include "EventTimeFn2.hpp" -#include "xo/callback/CallbackSet.hpp" -#include - -namespace xo { - namespace reactor { - /* require: - * T null constructible - * T movable - * - * T satisfies EventTimeConcept - */ - template > - class FifoQueue : public virtual Sink1, public virtual EventSource> { - public: - using EventSink = Sink1; - template - using RpCallbackSet = xo::fn::RpCallbackSet; - using CallbackId = xo::fn::CallbackId; - using Reflect = xo::reflect::Reflect; - using TypeDescr = xo::reflect::TypeDescr; - using utc_nanos = xo::time::utc_nanos; - - public: - static rp make(EvTimeFn evtm_fn = EvTimeFn()) { return new FifoQueue(evtm_fn); } - - // ----- inherited from Sink1 ----- - - virtual void notify_ev(T const & ev) override { - bool is_priming = this->elt_q_.empty(); - - this->elt_q_.push_back(ev); - - ++(this->n_in_ev_); - - if (this->upstream_exhausted_) { - throw std::runtime_error("FifoQueue::notify_ev" - ": not allowed after upstream exhausted"); - } - - utc_nanos tm = evtm_fn_(ev); - - if (this->current_tm_ < tm) - this->current_tm_ = tm; - - Reactor * reactor = this->parent_reactor_; - - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", name_), - xtag("reactor", (void*)reactor), - xtag("is_priming", is_priming)); - - if (reactor) { - if (is_priming) { - /* reactor/simulator takes delivery/sequencing responsibility from here */ - reactor->notify_source_primed(bp::from_native(this)); - } - } else { - /* if no reactor, deliver immediately */ - this->deliver_one(); - } - } /*notify_ev*/ - - // ----- inherited from AbstractSink ----- - - /* we don't care about volatile sources -- fifo queue copies incoming events */ - virtual bool allow_volatile_source() const override { return true; } - - virtual uint32_t n_in_ev() const override { return n_in_ev_; } - - // ----- inherited from ReactorSource ----- - - virtual bool is_empty() const override { return elt_q_.empty(); } - virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); } - - virtual utc_nanos sim_current_tm() const override { - if (this->elt_q_.empty()) { - /* (in practice control never comes here) - * - * queue doesn't know time of next event yet; - * new events may appear at any time by way of .notify_event() - * - * if queue doesn't know next event, can't use .sim_current_tm - * to establish priority relative to other sources. - * In that case rely instead on priming mechanism; - * priming mechanism implies control should never come here - */ - return this->current_tm_; - } else { - return evtm_fn_(this->elt_q_.front()); - } - } /*sim_current_tm*/ - - virtual uint64_t deliver_one() override { - return this->deliver_one_aux(true /*replay_flag*/); - } /*deliver_one*/ - - virtual uint64_t sim_advance_until(utc_nanos target_tm, - bool replay_flag) override { - uint64_t retval = 0; - - while (!this->elt_q_.empty()) { - utc_nanos tm = evtm_fn_(this->elt_q_.front()); - - if (tm < target_tm) { - retval += this->deliver_one_aux(replay_flag); - } else { - break; - } - } - - return retval; - } /*sim_advance_until*/ - - virtual void notify_reactor_add(Reactor * reactor) override { - assert(!this->parent_reactor_); - - this->parent_reactor_ = reactor; - } /*notify_reactor_add*/ - - virtual void notify_reactor_remove(Reactor *) override { - this->parent_reactor_ = nullptr; - } - - // ----- inherited from AbstractSource ----- - - virtual TypeDescr source_ev_type() const override { return Reflect::require(); } - /* events must be copied objects owned by FifoQueue. - * not expected to be pointers to shared storage or something - */ - virtual bool is_volatile() const override { return false; } - virtual uint32_t n_queued_out_ev() const override { return elt_q_.size(); } - virtual uint32_t n_out_ev() const override { return n_out_ev_; } - virtual bool debug_sim_flag() const override { return debug_sim_flag_; } - virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; } - - virtual CallbackId attach_sink(rp const & sink) override { - rp native_sink - = EventSink::require_native("FifoQueue::attach_sink", sink); - - if (native_sink) { - if (!this->is_volatile() - || native_sink->allow_volatile_source()) - { - return this->add_callback(native_sink); - } else { - throw std::runtime_error("FifoQueue::attach_sink" - ": sink requires non-volatile source " - + std::string(reflect::type_name())); - } - } else { - throw std::runtime_error("FifoQueue::attach_sink" - ": expected sink accepting " - + std::string(reflect::type_name())); - } - } /*attach_sink*/ - - virtual void detach_sink(CallbackId id) override { - this->remove_callback(id); - } - - // ----- inherited from EventSource ----- - - virtual CallbackId add_callback(rp const & cb) override { - return this->cb_set_.add_callback(cb); - } - - virtual void remove_callback(CallbackId id) override { - this->cb_set_.remove_callback(id); - } - - // ----- inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return name_; } - virtual void set_name(std::string const & x) override { this->name_ = x; } - - virtual void visit_direct_consumers(std::function ep)> const & fn) override { - for (auto x : this->cb_set_) - fn(x.fn_.borrow()); - } /*visit_direct_consumers*/ - - /* write human-readable representation to stream */ - virtual void display(std::ostream & os) const override { - os << "()) - << ">"; - } /*display*/ - - private: - FifoQueue(EvTimeFn evtm_fn) : evtm_fn_{std::move(evtm_fn)} {} - - uint64_t deliver_one_aux(bool replay_flag) { - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", this->name_), - xtag("elt_q.size", this->elt_q_.size()), - xtag("replay_flag", replay_flag)); - - if (this->elt_q_.empty()) - return 0; - - /* avoiding copy for efficiently-swappable T */ - T ev; - std::swap(ev, this->elt_q_.front()); - - this->elt_q_.pop_front(); - - if (replay_flag) { - log && log(xtag("deliver-ev", ev), - xtag("elt_q.size", this->elt_q_.size())); - - ++(this->n_out_ev_); - this->cb_set_.invoke(&EventSink::notify_ev, ev); - } - - return 1; - } /*deliver_one_aux*/ - - private: - /* name (ideally unique) for this queue */ - std::string name_; - - /* extract timestamp from an event */ - EvTimeFn evtm_fn_; - - /* if true, simulator/reactor will report interaction with this source */ - bool debug_sim_flag_ = false; - - /* largest event timestamp delivered - * (monotonically increases, event if events received out-of-timestamp-order) - */ - utc_nanos current_tm_; - - /* events waiting for delivery */ - std::deque elt_q_; - - /* lifetime #of events received */ - uint32_t n_in_ev_ = 0; - /* lifetime #of events delivered */ - uint32_t n_out_ev_ = 0; - - /* set to true, once, to announce that upstream will send no more events. - * see .notify_upstream_exhausted() ? - */ - bool upstream_exhausted_ = false; - - /* reactor/simulator being used to schedule event consumption. - * if omitted, borrow calling thread - */ - Reactor * parent_reactor_ = nullptr; - - /* invoke callbacks in this set to deliver queued events */ - RpCallbackSet cb_set_; - - }; /*FifoQueue*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end FifoQueue.hpp */ diff --git a/.xo-reactor/include/xo/reactor/HeapReducer.hpp b/.xo-reactor/include/xo/reactor/HeapReducer.hpp deleted file mode 100644 index 7a7456ae..00000000 --- a/.xo-reactor/include/xo/reactor/HeapReducer.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/* @file HeapReducer.hpp */ - -#pragma once - -#include "Reducer.hpp" - -namespace xo { - namespace reactor { - /* collect incoming events in a heap, - * ordered by timestamp. - * output events in increasing timestamp order. - * Information preserving in all other respects - * - * Require: - * - Event is null-constructible - * - Event is copyable - * - EventTimeFn :: Event -> utc_nanos - */ - template> - class HeapReducer : public ReducerBase { - public: - using utc_nanos = xo::time::utc_nanos; - public: - HeapReducer() = default; - HeapReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} - - bool is_empty() const { return this->event_heap_.empty(); } - /* require: .is_empty() = false */ - utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); } - /* #of events stored in this reducer */ - uint32_t n_event() const { return this->event_heap_.size(); } - - Event const & last_annexed_ev() const { return this->annexed_ev_; } - - void include_event(Event const & ev) { - this->event_heap_.push_back(ev); - std::push_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - } /*include_event*/ - - void include_event(Event && ev) { - this->event_heap_.push_back(std::move(ev)); - std::push_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - } /*include_event*/ - - Event & annex_one() { - this->annexed_ev_ = this->event_heap_.front(); - std::pop_heap(this->event_heap_.begin(), - this->event_heap_.end(), - std::greater()); - this->event_heap_.pop_back(); - - return this->annexed_ev_; - } /*annex_one*/ - - // ----- Inherited from ReducerBase ----- - - // utc_nanos event_tm(Event const & x); - - private: - /* queued Events, in increasing timestamp order */ - std::vector event_heap_; - /* annexed event, removed from .event_heap */ - Event annexed_ev_; - }; /*HeapReducer*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end HeapReducer.hpp */ diff --git a/.xo-reactor/include/xo/reactor/LastReducer.hpp b/.xo-reactor/include/xo/reactor/LastReducer.hpp deleted file mode 100644 index b18f1a09..00000000 --- a/.xo-reactor/include/xo/reactor/LastReducer.hpp +++ /dev/null @@ -1,154 +0,0 @@ -/* @file LastReducer.hpp */ - -#pragma once - -#include "Reducer.hpp" -#include - -namespace xo { - namespace reactor { - /* implementation record used in LastReducer. - * LastReducer (see below) remembers a single event, - * + will be updated on successive calls to - * LastReducer.include_event() - * - * need to remember the _first_ (& therefore earliest) - * event timestamp in such a wave, since that establishes when simulator - * should deliver the event -- even if event is subsequently - * overwritten. - * - * once event is delivered, timestamp can reset - * - * otherwise if upstream producer sends events with - * future timestamps, can get indefinite postponement - * with simulation clock failing to catch up to event time. - * - */ - - template - class EventRecd { - public: - using utc_nanos = xo::time::utc_nanos; - - public: - EventRecd() = default; - EventRecd(utc_nanos tm, Event ev) : trigger_tm_{tm}, ev_{ev} {} - EventRecd(utc_nanos tm, Event && ev) : trigger_tm_{tm}, ev_{std::move(ev)} {} - - public: - /* if sim, deliver event when simulation clock reaches - * .trigger_tm; .trigger_tm can be earlier than .ev time - */ - utc_nanos trigger_tm_; - /* event to deliver */ - Event ev_; - }; - - /* reducer that just remembers the last event - * - * Require: - * - Event is null-contructible - * - Event is copyable - * - * LastReducer provides reentrancy support. This support doesn't operate - * if Event copy is not deep, e.g. for Event = rpn - * - * .include_event() - * /-------\ -----------------> /------\ - * | empty | | full | - * \-------/ <----------------- \------/ - * . .annex_one() . - * . . - * .is_empty()=true .is_empty()=false - */ - template> - class LastReducer : public ReducerBase { - public: - using utc_nanos = xo::time::utc_nanos; - - public: - LastReducer() = default; - LastReducer(EventTimeFn const & evtfn) : ReducerBase(evtfn) {} - - bool is_empty() const { return empty_flag_; } - /* require: .is_empty() = false */ - utc_nanos next_tm() const { - return this->last_ev_[this->last_ix_].trigger_tm_; - //return this->event_tm(this->last_ev_[this->last_ix_]); - } - /* #of events stored in this reducer (0 or 1) */ - uint32_t n_event() const { return this->empty_flag_ ? 0 : 1; } - - Event const & last_annexed_ev() const { - return this->last_ev_[1 - this->last_ix_].ev_; - } - - EventRecd & include_event_aux(Event const & ev) { - EventRecd & evr - = this->last_ev_[this->last_ix_]; - - if (this->empty_flag_) { - /* evr.trigger_tm will be preserved across - * successive calls to .include_event(); - * until .annex_one() - */ - evr.trigger_tm_ = this->event_tm(ev); - - this->empty_flag_ = false; - } - - return evr; - } /*include_event_aux*/ - - void include_event(Event const & ev) { - EventRecd & evr - = this->include_event_aux(ev); - - evr.ev_ = ev; - } /*include_event*/ - - void include_event(Event && ev) { - EventRecd & evr - = this->include_event_aux(ev); - - evr.ev_ = std::move(ev); - } /*include_event*/ - - Event & annex_one() { - std::uint32_t annexed_ix = this->last_ix_; - - /* since .empty_flag is true, - * next call to .include_event_aux() will - * capture new timestamp - */ - this->empty_flag_ = true; - this->last_ix_ = (1 - this->last_ix_); - - return this->last_ev_[annexed_ix].ev_; - } /*annex_one*/ - - // ----- Inherited from ReducerBase ----- - - //utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); } - - private: - /* true when reducer contains 0 queued events, - * not counting any annexed event - */ - bool empty_flag_ = true; - - /* .last_ev[.last_ix] updated by .include_event() - */ - std::uint32_t last_ix_ = 0; - /* remember two events - * (a) a single queued event (updated by .include_event()) - * (b) a single removed event (reported by .annex_one()) - * - * roles of .last_ev[0], .last_ev[1] reverse each time .annex_one() runs - */ - std::array, 2> last_ev_; - }; /*LastReducer*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end LastReducer.hpp */ diff --git a/.xo-reactor/include/xo/reactor/PollingReactor.hpp b/.xo-reactor/include/xo/reactor/PollingReactor.hpp deleted file mode 100644 index f5e02a12..00000000 --- a/.xo-reactor/include/xo/reactor/PollingReactor.hpp +++ /dev/null @@ -1,60 +0,0 @@ -/* @file PollingReactor.hpp */ - -#pragma once - -#include "Reactor.hpp" -#include "ReactorSource.hpp" -#include -#include - -namespace xo { - namespace reactor { - /* reactor that runs by polling an ordered set of sources */ - class PollingReactor : public Reactor { - public: - /* named ctor idiom */ - static rp make() { return new PollingReactor(); } - - // ----- inherited from Reactor ----- - - virtual bool add_source(bp src) override; - virtual bool remove_source(bp src) override; - virtual void notify_source_primed(bp src) override; - virtual std::uint64_t run_one() override; - - virtual void display(std::ostream & os) const override; - - private: - PollingReactor() = default; - - /* find non-empty source, starting from .source_v_[start_ix], - * wrapping around to .source_v_[start_ix - 1]. - * - * return index of first available non-empty source, - * or -1 if all sources are empty - */ - std::int64_t find_nonempty_source(std::size_t start_ix); - - private: - /* next source to poll will be .source_v_[.next_ix_] */ - std::size_t next_ix_ = 0; - - /* ordered set of sources (see reactor::Source) - * reactor will poll sources in round-robin order - */ - std::vector source_v_; - }; /*PollingReactor*/ - - } /*namespace reactor*/ - -#ifndef ppdetail_atomic - namespace print { - using PollingReactorPointer = xo::reactor::PollingReactor*; - // placeholder, until we implement pretty-printing - PPDETAIL_ATOMIC(xo::reactor::PollingReactor); - PPDETAIL_ATOMIC(PollingReactorPointer); - } -#endif -} /*namespace xo*/ - -/* end PollingReactor.hpp */ diff --git a/.xo-reactor/include/xo/reactor/PolyAdapterSink.hpp b/.xo-reactor/include/xo/reactor/PolyAdapterSink.hpp deleted file mode 100644 index 9104ddc2..00000000 --- a/.xo-reactor/include/xo/reactor/PolyAdapterSink.hpp +++ /dev/null @@ -1,92 +0,0 @@ -/* file PolyAdapterSink.hpp - * - * author: Roland Conybeare, Sep 2022 - */ - -#pragma once - -#include "Sink.hpp" -#include "xo/reflect/Reflect.hpp" - -namespace xo { - namespace reactor { - /* adapter between a source that delivers a particular event type T, - * and a sink that accepts arbitrarily-typed events via .notify_ev_tp() - * Use this to connect to a polymorphic sink. - * - * Require: - * - .poly_sink.allow_polymorphic_source() - * (ofc. otherwise no point in using PolyAdapterSink) - * - .poly_sink.allow_volatile_source() - * need this bc will be wrapping event with TaggedPtr, - * which doesn't manage event lifetime - */ - template - class PolyAdapterSink : public reactor::Sink1 { - public: - using Reflect = reflect::Reflect; - using TaggedPtr = reflect::TaggedPtr; - - public: - /* named ctor idiom */ - static rp make(rp poly_sink) { - //xo::scope lscope("PolyAdapterSink::make"); - - rp retval(new PolyAdapterSink(poly_sink)); - - //lscope.log("adapter", (void*)retval.get()); - - return retval; - } /*make*/ - - // ----- Inherited from Sink1 ----- - - virtual void notify_ev(T const & ev) override { - //xo::scope lscope("PolyAdapterSink::notify_ev"); - //lscope.log(xo::xtag("ev", ev)); - - TaggedPtr ev_tp = Reflect::make_tp(const_cast(&ev)); - - this->notify_ev_tp(ev_tp); - } /*notify_ev*/ - - // ----- Inherited from AbstractSink ----- - - virtual bool allow_volatile_source() const override { return true; } - virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); } - /* note: ok to do this, however if expecting to use this entry point, - * maybe don't need to interpose PolyAdapterSink ahead of .poly_sink - */ - virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { - //xo::scope lscope("PolyAdapterSink::notify_ev_tp"); - - return this->poly_sink_->notify_ev_tp(ev_tp); - } - - // ----- Inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return this->poly_sink_->name(); } - virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); } - virtual void visit_direct_consumers(std::function ep)> const & fn) override { - this->poly_sink_->visit_direct_consumers(fn); - } - virtual void display(std::ostream & os) const override { - using xo::xtag; - os << "()) - << xtag("poly", this->poly_sink_) - << ">"; - } /*display*/ - - private: - PolyAdapterSink(rp poly_sink) : poly_sink_{std::move(poly_sink)} {} - - private: - /* mandate: .poly_sink.allow_polymorphic_source() is true */ - rp poly_sink_; - }; /*PolyAdapterSink*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end PolyAdapterSink.hpp */ diff --git a/.xo-reactor/include/xo/reactor/Reactor.hpp b/.xo-reactor/include/xo/reactor/Reactor.hpp deleted file mode 100644 index a2048c0b..00000000 --- a/.xo-reactor/include/xo/reactor/Reactor.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* @file Reactor.hpp */ - -#pragma once - -#include "xo/refcnt/Refcounted.hpp" -#include "xo/indentlog/log_level.hpp" -#include - -namespace xo { - namespace reactor { - class ReactorSource; - - /* abtract api for a reactor: - * something that arranges to have work done on a set of Sources. - */ - class Reactor : public ref::Refcount { - public: - virtual ~Reactor() = default; - - log_level loglevel() const { return loglevel_; } - void set_loglevel(log_level loglevel) { loglevel_ = loglevel; } - - /* add source src to this reactor. - * on success, invoke src.notify_reactor_add(this) - * - * returns true if source added; false if already present - */ - virtual bool add_source(bp src) = 0; - - /* remove source src from this reactor. - * source must previously have been added by - * .add_source(src). - * - * on success, invoke src.notify_reactor_remove(this) - * - * returns true if source removed; false if not present - */ - virtual bool remove_source(bp src) = 0; - - /* notification when non-primed source (source with no known events) - * becomes primed (source with at least one event) - */ - virtual void notify_source_primed(bp src) = 0; - - /* dispatch one reactor event, borrowing the calling thread - * amount of work this represents is Source/Sink specific. - * - * returns #of events dispatched (0 or 1) - */ - virtual std::uint64_t run_one() = 0; - - /* borrow calling thread to dispatch reactor events. - * if n is -1, run indefinitely - * otherwise dispatch up to n events. - * n = 0 is a noop - */ - std::uint64_t run_n(int32_t n); - - /* borrow calling thread to run indefinitely. - * suitable implementation for dedicated reactor threads - */ - void run() { this->run_n(-1); } - - /** print self human-readably on stream @p os - **/ - virtual void display(std::ostream & os) const = 0; - - protected: - Reactor(); - - private: - /* control logging verbosity */ - log_level loglevel_; - }; /*Reactor*/ - - inline std::ostream & - operator<<(std::ostream & os, const Reactor & x) { - x.display(os); - return os; - } - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Reactor.hpp */ diff --git a/.xo-reactor/include/xo/reactor/ReactorSource.hpp b/.xo-reactor/include/xo/reactor/ReactorSource.hpp deleted file mode 100644 index bdcc9db4..00000000 --- a/.xo-reactor/include/xo/reactor/ReactorSource.hpp +++ /dev/null @@ -1,130 +0,0 @@ -/* @file ReactorSource.hpp */ - -#pragma once - -#include "AbstractSource.hpp" -//#include "time/Time.hpp" -#include - -namespace xo { - namespace reactor { - class Reactor; - - /* abstract api for a source of events. - * Event representation is left open: Sources and Sinks - * need to have compatible event representations, - * and coordination is left to such (Source, Sink) pairs. - * - * Source->Sink activity may be expected to be mediated by a reactor, - * that implements the Reactor api. - * - * At any time, A Source can be associated with at most one reactor. - * Sources are informed of Reactor<->Source association being - * formed/broken by the - * .notify_reactor_add(), .notify_reactor_remove() - * methods - * - * The source api intends also to provide for simulation. - * There introduces two simulation-specific methods: - * .sim_current_tm() - * .sim_advance_until() - * - * A non-simulation source can implement these as calls to - * .online_current_tm(), .online_advance_until() respectively - * .online_current_tm() aborts since an online source is never exhausted - * .online_advance_until() is a no-op that returns 0 - * - * Loop for consuming from a primary simulation source: - * - * brw s = ...; - * while(!s->is_exhausted()) - * s->deliver_one(); - * - * Secondary sources (sources that depend on other sources) can be - * in a state where they don't know their next event, in which case: - * - * s->is_notprimed() == true - */ - class ReactorSource : public AbstractSource { - public: - using utc_nanos = xo::time::utc_nanos; - - public: - virtual ~ReactorSource() = default; - - /* true if source is currently empty (has 0 events to deliver) */ - virtual bool is_empty() const = 0; - bool is_nonempty() const { return !this->is_empty(); } - - /* true when source knows its next event - * A source that isn't primed is also excluded from simulation - * heap until it becomes primed. - * This make feasible simulation sources that - * depend on other simulation sources - */ - virtual bool is_primed() const { return !this->is_empty(); } - virtual bool is_notprimed() const { return this->is_empty(); } - - /* if true, this source has no events, and will never publish more events - * - for sim, return true for a standalone source that has replayed all events - * - for rt, set during orderly - */ - virtual bool is_exhausted() const = 0; - - /* if this is a simulation source and .is_exhausted is false: - * returns next event time; more precisely, no events exist prior to - * this time. - * - * if sim, and .is_primed = true, - * returns timestamp of next event - */ - virtual utc_nanos sim_current_tm() const = 0; - - /* promise: - * - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true - * - if replay_flag is true, then any events between previous .current_tm() - * and new .current_tm() will have been published - * - * returns #of events delivered. - * does not count events that were skipped, so always returns 0 if - * replay_flag is false - */ - virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0; - - /* informs source when it's added to a reactor - - * (see Reactor.add_source()) - */ - virtual void notify_reactor_add(Reactor * /*reactor*/) {} - - /* informs source when it's removed from a reactor - * (see Reactor.remove_source()) - */ - virtual void notify_reactor_remove(Reactor * /*reactor*/) {} - - // ----- Inherited from AbstractSource ----- - - /* deliver one event to attached sink - * interpretation of 'one event' is source-specific; - * could be a collapsed or batched event in practice. - * - * no-op if source is empty. - * - * if sim, promise: - * - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted() - * - * returns #of events delivered. Must be 0 or 1 in this context - */ - virtual std::uint64_t deliver_one() override = 0; - - protected: - /* default implementations for online sources */ - utc_nanos online_current_tm() const; - uint64_t online_advance_until(utc_nanos tm, bool replay_flag); - }; /*ReactorSource*/ - - using ReactorSourcePtr = rp; - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end ReactorSource.hpp */ diff --git a/.xo-reactor/include/xo/reactor/Reducer.hpp b/.xo-reactor/include/xo/reactor/Reducer.hpp deleted file mode 100644 index a457dfe0..00000000 --- a/.xo-reactor/include/xo/reactor/Reducer.hpp +++ /dev/null @@ -1,33 +0,0 @@ -/* @file Reducer.hpp */ - -#pragma once - -#include "xo/reactor/EventTimeFn.hpp" - -namespace xo { - namespace reactor { - /* LastReducer, HeapReducer inherit ReducerBase */ - template - class ReducerBase { - static_assert(EventTimeConcept); - - public: - using utc_nanos = xo::time::utc_nanos; - - public: - ReducerBase() = default; - ReducerBase(EventTimeFn const & evtfn) : event_tm_fn_{evtfn} {} - - utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); } - - private: - /* Event ev = ...; - * .event_tm_fn(ev) -> utc_nanos - * reports event time associated with ev - */ - EventTimeFn event_tm_fn_; - }; /*ReducerBase*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Reducer.hpp */ diff --git a/.xo-reactor/include/xo/reactor/SecondarySource.hpp b/.xo-reactor/include/xo/reactor/SecondarySource.hpp deleted file mode 100644 index 00459e98..00000000 --- a/.xo-reactor/include/xo/reactor/SecondarySource.hpp +++ /dev/null @@ -1,359 +0,0 @@ -/* @file SecondarySource.hpp */ - -#pragma once - -#include "EventSource.hpp" -#include "Sink.hpp" -#include "Reactor.hpp" -#include "HeapReducer.hpp" -#include "xo/callback/CallbackSet.hpp" -#include "xo/cxxutil/demangle.hpp" -#include - -namespace xo { - namespace reactor { - /* A passive event source. - * Can use as backend publisher when implementating another - * event processor. - */ - template> - class SecondarySource : public EventSource> { - public: - using EventSink = Sink1; - template - using RpCallbackSet = fn::RpCallbackSet; - using CallbackId = fn::CallbackId; - using TypeDescr = xo::reflect::TypeDescr; - using utc_nanos = xo::time::utc_nanos; - - public: - ~SecondarySource() = default; - - static rp make() { return new SecondarySource(); } - - /* last event delivered from this source -- - * i.e. event in most recent call to .deliver_one_aux() - */ - Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); } - - void notify_upstream_exhausted() { this->upstream_exhausted_ = true; } - - /* make event available to reactor, by adding to internal reducer */ - void notify_secondary_event(Event const & ev) { - /* test if ev is priming, update .current_tm */ - bool is_priming = this->preprocess_secondary_event(ev); - - this->reducer_.include_event(ev); - - this->postprocess_secondary_event(is_priming); - } /*notify_secondary_event*/ - - void notify_secondary_event(Event && ev) { - bool is_priming = this->preprocess_secondary_event(ev); - - this->reducer_.include_event(ev); - - this->postprocess_secondary_event(is_priming); - } /*notify_secondary_event*/ - - template - void notify_secondary_event_v(T const & v) { - using xo::scope; - using xo::xtag; - - if (v.empty()) - return; - - scope log(XO_DEBUG(this->debug_sim_flag_)); - - log && log(xtag("name", this->name())); - - if (this->upstream_exhausted_) { - throw std::runtime_error("SecondarySource::notify_secondary_event_v" - ": not allowed after upstream exhausted"); - } - - uint32_t n_ev = 0; - - for (Event const & ev : v) { - utc_nanos evtm = this->reducer_.event_tm(ev); - - if (this->current_tm_ < evtm) - this->current_tm_ = evtm; - - ++n_ev; - } - - log && log(xtag("T", reflect::type_name()), - xtag("n_ev", n_ev)); - - if (n_ev > 0) { - /* if reducer is empty when .notify_secondary_event_v() begins, - * then reactor/simulator needs to be notified that source is no longer empty - */ - bool is_priming = this->reducer_.is_empty(); - - for (Event const & ev : v) - this->reducer_.include_event(ev); - - Reactor * reactor = this->parent_reactor_; - - if (reactor) { - if (is_priming) { - /* reactor/simulator takes responsibility for delivering events */ - reactor->notify_source_primed(bp::from_native(this)); - } - } else { - /* special case if no reactor: deliver immediately */ - - //this->deliver_one(); - this->deliver_all(); - } - } - } /*notify_secondary_event_v*/ - - // ----- inherited from EventSource ----- - - virtual CallbackId add_callback(rp const & cb) override { - return this->cb_set_.add_callback(cb); - } /*add_callback*/ - - virtual void remove_callback(CallbackId id) override { - this->cb_set_.remove_callback(id); - } /*remove_callback*/ - - // ----- inherited from ReactorSource ----- - - virtual bool is_empty() const override { return this->reducer_.is_empty(); } - virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); } - - virtual utc_nanos sim_current_tm() const override { - using xo::scope; - using xo::xtag; - - if (this->reducer_.is_empty()) { - /* this is a tricky case. - * it means this source doesn't - * _know_ specific next event yet; however new events - * may appear at any time by way of .notify_event() - * - * If event doesn't know next event, then .current_tm isn't useful - * for establishing priority relative to other sources. - * rely on priming mechanism instead, - * which means that control should never come here. - */ - return this->current_tm_; - } else { - scope log(XO_DEBUG(false /*this->debug_sim_flag_*/), - xtag("name", this->name_), - xtag("next_tm", this->reducer_.next_tm())); - - return this->reducer_.next_tm(); - } - } /*sim_current_tm*/ - - virtual std::uint64_t deliver_one() override { - return this->deliver_one_aux(true /*replay_flag*/); - } - - virtual std::uint64_t sim_advance_until(utc_nanos target_tm, - bool replay_flag) override - { - uint64_t retval = 0; - - while (!this->reducer_.is_empty()) { - utc_nanos tm = this->sim_current_tm(); - - if (tm < target_tm) { - retval += this->deliver_one_aux(replay_flag); - } else { - break; - } - } - - return retval; - } /*sim_advance_until*/ - - virtual void notify_reactor_add(Reactor * reactor) override { - assert(!this->parent_reactor_); - - this->parent_reactor_ = reactor; - } /*notify_reactor_add*/ - - virtual void notify_reactor_remove(Reactor * /*reactor*/) override {} - - // ----- inherited from AbstractSource ----- - - virtual TypeDescr source_ev_type() const override { - return reflect::Reflect::require(); - } /*source_ev_type*/ - - virtual uint32_t n_out_ev() const override { return n_out_ev_; } - /* #of events queued for delivery */ - virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); } - - virtual bool debug_sim_flag() const override { return debug_sim_flag_; } - virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; } - - virtual CallbackId attach_sink(rp const & sink) override { - rp native_sink - = EventSink::require_native("SecondarySource::attach_sink", sink); - - if (native_sink) { - if (!this->is_volatile() - || native_sink->allow_volatile_source()) - { - return this->add_callback(native_sink); - } else { - throw std::runtime_error("SecondarySource::attach_sink" - ": sink requires non-volatile source " - + std::string(reflect::type_name())); - } - } else { - throw std::runtime_error("SecondarySource::attach_sink" - ": expected sink accepting " - + std::string(reflect::type_name())); - } - } /*attach_sink*/ - - virtual void detach_sink(CallbackId id) override { - this->remove_callback(id); - } /*detach_sink*/ - - // ----- Inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return name_; } - virtual void set_name(std::string const & x) override { this->name_ = x; } - - virtual void visit_direct_consumers(std::function ep)> const & fn) override { - - for(auto x : this->cb_set_) - fn(x.fn_.borrow()); - } /*visit_direct_consumers*/ - - private: - /* event book-keeping on receiving an event. - */ - bool preprocess_secondary_event(Event const & ev) - { - if (this->upstream_exhausted_) { - throw std::runtime_error("SecondarySource::notify_secondary_event" - ": not allowed after upstream exhausted"); - } - - utc_nanos evtm = this->reducer_.event_tm(ev); - - if (this->current_tm_ < evtm) - this->current_tm_ = evtm; - - /* if reducer is empty when .notify_event() begins, - * then reactor/simulator needs to be notified that source is no longer empty - */ - bool is_priming = this->reducer_.is_empty(); - - return is_priming; - } /*preprocess_secondary_event*/ - - /* event bookkeeping after receiving an event. - * - * Require: event has been propagated to .reducer - * - * is_priming. true if event causes source to - * become non-empty --> must notify reactor - */ - void postprocess_secondary_event(bool is_priming) { - using xo::scope; - using xo::xtag; - - Reactor * reactor = this->parent_reactor_; - - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", name_), - xtag("reactor", (void*)reactor), - xtag("is_priming", is_priming)); - - if (reactor) { - if (is_priming) { - /* reactor/simulator takes responsibility for delivering events */ - reactor->notify_source_primed(bp::from_native(this)); - } - } else { - /* if no reactor, deliver immediately */ - this->deliver_one(); - } - } /*postprocess_secondary_event*/ - - /* deliver one event from reducer; - * invoke callback whenever replay_flag is true - */ - std::uint64_t deliver_one_aux(bool replay_flag) { - scope log(XO_DEBUG(this->debug_sim_flag_), - xtag("name", this->name_), - xtag("reducer.empty", this->reducer_.is_empty()), - xtag("replay_flag", replay_flag)); - - if (this->reducer_.is_empty()) - return 0; - - /* need to remove event _before_ invoking callbacks; - * callbacks may indirectly call this->notify_secondary_event(), - * modifiying .reducer - * - * reducer may use double-buffering scheme or similar to - * mitigate copying, esp when Event objects are heavy - */ - Event & ev = this->reducer_.annex_one(); - - /* if SecondarySource: - * Event ev = this->event_heap_.front(); - * std::pop_heap(this->event_heap_.begin(), - * this->event_heap_.end(), - * std::greater()); - * this->event_heap_.pop_back(); - */ - - if (replay_flag) { - ++(this->n_out_ev_); - this->cb_set_.invoke(&EventSink::notify_ev, ev); - } - - return 1; - } /*deliver_one_aux*/ - - private: - /* current time for this source */ - utc_nanos current_tm_; - - /* reporting name for this source (use when .debug_sim_flag set) - */ - std::string name_; - - /* if true, reactor/simulator to log interaction with this source - */ - bool debug_sim_flag_ = false; - - /* count lifetime #of outgoing events */ - uint32_t n_out_ev_ = 0; - - /* set this to true, once, to announce that upstream will send - * no more events. see .notify_upstream_exhausted() - */ - bool upstream_exhausted_ = false; - - /* events to be delivered to callbacks. - * multiple events may be collapsed depending on Reducer implementation - */ - Reducer reducer_; - - /* reactor/simulator being used to schedule consumption. if ommitted, - * will borrow thread calling .notify_secondary_event() - */ - Reactor * parent_reactor_ = nullptr; - - /* invoke callbacks in this set to send an outgoing event */ - RpCallbackSet cb_set_; - }; /*SecondarySource*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end SecondarySource.hpp */ diff --git a/.xo-reactor/include/xo/reactor/Sink.hpp b/.xo-reactor/include/xo/reactor/Sink.hpp deleted file mode 100644 index 8a37ffd2..00000000 --- a/.xo-reactor/include/xo/reactor/Sink.hpp +++ /dev/null @@ -1,222 +0,0 @@ -/* @file Sink.hpp */ - -#pragma once - -#include "AbstractSink.hpp" -#include "AbstractSource.hpp" -#include "PolyAdapterSink.hpp" -#include "xo/reflect/Reflect.hpp" -#include "xo/indentlog/print/time.hpp" -#include "xo/indentlog/print/tag.hpp" -#include "xo/cxxutil/demangle.hpp" -#include - -namespace xo { - namespace reactor { - /* Sink for events of type T - * - * inheritance: - * ref::Refcount - * ^ - * isa - * | - * reactor::AbstractEventProcessor - * ^ - * isa - * | - * reactor::AbstractSink - * ^ - * isa - * | - * reactor::Sink1 - */ - template - class Sink1 : public AbstractSink { - public: - using Reflect = reflect::Reflect; - using TypeDescr = reflect::TypeDescr; - - public: - /* convenience: convert abstract sink to Sink1*, - * or throw - */ - static rp> require_native(std::string_view caller, - rp const & sink) - { - using xo::scope; - using xo::xtag; - - /* 1. if sink expects events of type T, - * make direct connection - */ - Sink1 * native_sink = nullptr; - - native_sink = dynamic_cast *>(sink.get()); - - if (native_sink) - return native_sink; - - /* 2. if sink is polymorphic, - * make type-erasing adapter - */ - - if (sink->allow_polymorphic_source()) { -#ifdef DEBUG_NOT_USING - scope lscope("Sink1::require_native: create PolyAdapterSink"); - lscope.log(xtag("caller", caller)); -#endif - - return PolyAdapterSink::make(sink); - } - - if (!native_sink) { -#ifdef DEBUG_EVENT_TYPEINFO - std::type_info const * sink_parent_typeinfo - = sink->parent_typeinfo(); -#endif - - std::size_t src_hashcode = typeid(T).hash_code(); - - throw std::runtime_error(tostr("Sink1::require_native" - ": wanted to sink S, but sink expects T", - xtag("caller", caller), - xtag("T", sink->sink_ev_type()->canonical_name()), - xtag("S", reflect::type_name()), - xtag("required_hashcode", typeid(Sink1).hash_code()), - xtag("required_name", typeid(Sink1).name()), - xtag("src_hashcode", src_hashcode), - xtag("sink_hashcode", sink->sink_ev_type()->native_typeinfo()->hash_code()) -#ifdef DEBUG_EVENT_TYPEINFO - , xtag("sink_hashcode", sink->item_typeinfo()->hash_code()) - , xtag("sink_parent_hashcode", sink_parent_typeinfo->hash_code()) - , xtag("sink_parent_name", sink_parent_typeinfo->name()) - , xtag("sink.type", sink->self_typename()) - , xtag("sink.parent_type", sink->parent_typename()) -#endif - )); - } - - return native_sink; - } /*require_native*/ - - virtual TypeDescr sink_ev_type() const override { return reflect::Reflect::require(); } - /* accept incoming event */ - virtual void notify_ev(T const & ev) = 0; - - /* invoke these when this sink added to, or removed from, a source */ - virtual void notify_add_callback() {} - virtual void notify_remove_callback() {} - - // ----- inherited from AbstractSink ----- - - /* Sink1 only allows source providing T */ - virtual bool allow_polymorphic_source() const override { return false; } - - virtual void attach_source(rp const & src) override { - src->attach_sink(this); - } /*attach_source*/ - - virtual void notify_ev_tp(TaggedPtr const & ev_tp) override { - using xo::xtag; - - T * p_ev = ev_tp.recover_native(); - - if (p_ev) { - this->notify_ev(*p_ev); - } else { - throw std::runtime_error(tostr("Sink1::notify_ev_tp" - ": unable to convert ev_tp to T", - xtag("ev_tp.type", ev_tp.td()->canonical_name()), - xtag("T", reflect::type_name()))); - } - } /*notify_ev_tp*/ - }; /*Sink1*/ - - /* a sink with no further downstream processors */ - template - class SinkEndpoint : public Sink1 { - public: - // ----- Inherited from AbstractEventProcessor ----- - - virtual std::string const & name() const override { return name_; } - virtual void set_name(std::string const & x) override { name_ = x; } - - virtual void visit_direct_consumers(std::function)> const &) override { - /* *this is not an event source */ - } /*visit_direct_consumers*/ - - private: - /* reporting name for this sink */ - std::string name_; - }; /*SinkEndpoint*/ - - template - class SinkToFunction : public SinkEndpoint { - public: - SinkToFunction(Fn fn) : fn_{std::move(fn)} {} - - /* NOTE: conservative choice here, could templatize on this */ - virtual bool allow_volatile_source() const override { return false; } - virtual uint32_t n_in_ev() const override { return n_in_ev_; } - virtual void notify_ev(T const & ev) override { - ++(this->n_in_ev_); - fn_(ev); - } /*notify_ev*/ - - virtual void display(std::ostream & os) const override { - using xo::xtag; - - os << "name()) - << xtag("n_in_ev", this->n_in_ev()) - << ">"; - } /*display*/ - - private: - Fn fn_; - /* counts lifetime #of incoming events (see .notify_ev()) */ - uint32_t n_in_ev_ = 0; - }; /*SinkToFunction*/ - - /* sink that prints to console */ - template - class SinkToConsole : public SinkEndpoint { - public: - SinkToConsole() {} - - virtual bool allow_volatile_source() const override { return true; } - virtual uint32_t n_in_ev() const override { return n_in_ev_; } - virtual void notify_ev(T const & ev) override { - //using logutil::operator<<; - - ++(this->n_in_ev_); - - std::cout << ev << std::endl; - } /*notify_ev*/ - - virtual void display(std::ostream & os) const override { - using xo::xtag; - - os << "name()) - << xtag("n_in_ev", this->n_in_ev()) - << ">"; - } /*display*/ - - private: - /* reporting name for this sink */ - std::string name_; - /* counts lifetime #of incoming events (see .notify_ev()) */ - uint32_t n_in_ev_ = 0; - }; /*SinkToConsole*/ - -#ifdef NOT_USING - class TemporaryTest { - public: - static rp>> realization_printer(); - }; /*TemporaryTest*/ -#endif - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Sink.hpp */ diff --git a/.xo-reactor/include/xo/reactor/init_reactor.hpp b/.xo-reactor/include/xo/reactor/init_reactor.hpp deleted file mode 100644 index 3e5e51bb..00000000 --- a/.xo-reactor/include/xo/reactor/init_reactor.hpp +++ /dev/null @@ -1,20 +0,0 @@ -/* file init_reactor.hpp - * - * author: Roland Conybeare, Aug 2022 - */ - -#pragma once - -#include "xo/subsys/Subsystem.hpp" - -namespace xo { - enum S_reactor_tag {}; - - template<> - struct InitSubsys { - static void init(); - static InitEvidence require(); - }; -} /*namespace xo*/ - -/* end init_reactor.hpp */ diff --git a/.xo-reactor/src/reactor/AbstractEventProcessor.cpp b/.xo-reactor/src/reactor/AbstractEventProcessor.cpp deleted file mode 100644 index 824024cf..00000000 --- a/.xo-reactor/src/reactor/AbstractEventProcessor.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* @file AbstractEventProcessor.cp */ - -#include "AbstractEventProcessor.hpp" -#include "xo/indentlog/print/tostr.hpp" -#include -#include - -namespace xo { - using xo::tostr; - using std::uint32_t; - - namespace reactor { - namespace { - /* search all event processors ep reachable (dowstream) from x, - * add to *m; - */ - void - map_network_helper(bp x, - uint32_t * tsort_ix, - std::unordered_map * m) - { - if (m->contains(x.get())) - return; - - auto fn = [tsort_ix, m] - (bp ep) - { - map_network_helper(ep, tsort_ix, m); - }; - - x->visit_direct_consumers(fn); - - /* postorder! */ - (*m)[x.get()] = ++(*tsort_ix); - - } /*map_network_helper*/ - } /*namespace*/ - - std::vector> - AbstractEventProcessor::map_network(rp const & x) - { - std::unordered_map network_map; - - /* index event processors in reverse topological order: - * if B is (directly or indirectly) downstream from A, - * then tsort_ix(B) < tsort_ix(A) - */ - uint32_t tsort_ix = 0; - - /* depth-first traversal, detect and short-circuit on dup paths */ - map_network_helper(x.borrow(), &tsort_ix, &network_map); - - /* invariant: tsort_ix = #of event processors in network */ - uint32_t n = tsort_ix; - - /* network_map, now in a topologically sorted order */ - std::map tsorted_map; - { - for(auto const & x : network_map) { - uint32_t tsort_ix = x.second; - AbstractEventProcessor * ep = x.first; - - tsorted_map[n - tsort_ix] = ep; - } - } - - std::vector> retval; - { - for(auto const & x : tsorted_map) - retval.push_back(x.second); - } - - return retval; - } /*map_network*/ - - void - AbstractEventProcessor::display(std::ostream & os) const - { - os << ""; - } /*display*/ - - std::string - AbstractEventProcessor::display_string() const - { - return tostr(*this); - } /*display_string*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end AbstractEventProcessor.cpp */ diff --git a/.xo-reactor/src/reactor/AbstractSource.cpp b/.xo-reactor/src/reactor/AbstractSource.cpp deleted file mode 100644 index bbad859d..00000000 --- a/.xo-reactor/src/reactor/AbstractSource.cpp +++ /dev/null @@ -1,81 +0,0 @@ -/* @file AbstractSource.cpp */ - -#include "AbstractSource.hpp" -#include "xo/indentlog/scope.hpp" -#include "xo/webutil/StreamEndpointDescr.hpp" -//#include "indentlog/scope.hpp" - -namespace xo { - using xo::web::StreamEndpointDescr; - using xo::reactor::AbstractSink; - - namespace reactor { - StreamEndpointDescr - AbstractSource::stream_endpoint_descr(std::string const & url_prefix) - { - auto subscribe_fn - = ([this] - (rp const & ws_sink) - { - //scope lscope("AbstractSource::stream_endpoint_descr.subscribe_fn"); - - /* ws_sink created by websocket, sends events to websocket as json - * see [websock/WebsocketSink] - */ - return this->attach_sink(ws_sink); - }); - - auto unsubscribe_fn - = ([this] - (CallbackId id) - { - this->detach_sink(id); - }); - - return StreamEndpointDescr(url_prefix, - subscribe_fn, - unsubscribe_fn); - } /*stream_endpoint_descr*/ - - uint64_t - AbstractSource::deliver_n(uint64_t n) - { - uint64_t retval = 0; - - for (uint64_t i=0; ideliver_one(); - - if (n1 == 0) { - /* short-circuit if source has less than n - * events available - */ - break; - } - - retval += n1; - } - - return retval; - } /*deliver_n*/ - - uint64_t - AbstractSource::deliver_all() - { - uint64_t retval = 0; - - for (;;) { - uint64_t n1 = this->deliver_one(); - - if (n1 == 0) - break; - - retval += n1; - } - - return retval; - } /*deliver_all*/ - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end AbstractSource.cpp */ diff --git a/.xo-reactor/src/reactor/CMakeLists.txt b/.xo-reactor/src/reactor/CMakeLists.txt deleted file mode 100644 index f84c1ca3..00000000 --- a/.xo-reactor/src/reactor/CMakeLists.txt +++ /dev/null @@ -1,24 +0,0 @@ -# xo-reactor/src/reactor/CMakeLists.txt - -set(SELF_LIB reactor) -set(SELF_SRCS - AbstractEventProcessor.cpp AbstractSource.cpp ReactorSource.cpp - Sink.cpp - Reactor.cpp PollingReactor.cpp - init_reactor.cpp) - -xo_add_shared_library4(${SELF_LIB} ${PROJECT_NAME}Targets ${PROJECT_VERSION} 1 ${SELF_SRCS}) - -# ---------------------------------------------------------------- -# external dependencies - -# note: changes to xo_dependency() calls here -# must coordinate with find_dependency() calls -# in xo-reactor/cmake/reactorConfig.cmake.in -# -#xo_dependency(${SELF_LIB} xo_ordinaltree) # for some reason wants to link -lxo_ordinaltree ?? -xo_dependency(${SELF_LIB} reflect) -xo_dependency(${SELF_LIB} webutil) -xo_dependency(${SELF_LIB} printjson) -xo_dependency(${SELF_LIB} callback) -xo_dependency(${SELF_LIB} xo_ordinaltree) diff --git a/.xo-reactor/src/reactor/PollingReactor.cpp b/.xo-reactor/src/reactor/PollingReactor.cpp deleted file mode 100644 index b01e3f9f..00000000 --- a/.xo-reactor/src/reactor/PollingReactor.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* @file PollingReactor.cpp */ - -#include "PollingReactor.hpp" - -namespace xo { - using std::size_t; - using std::uint64_t; - using std::int64_t; - - namespace reactor { - bool - PollingReactor::add_source(bp src) - { - /* make sure src does not already appear in .source_v[] */ - for(ReactorSourcePtr const & x : this->source_v_) { - if(x.get() == src.get()) { - throw std::runtime_error("PollingReactor::add_source; source already present"); - return false; - } - } - - src->notify_reactor_add(this); - - this->source_v_.push_back(src.get()); - - return true; - } /*add_source*/ - - bool - PollingReactor::remove_source(bp src) - { - auto ix = std::find(this->source_v_.begin(), - this->source_v_.end(), - src); - - if(ix != this->source_v_.end()) { - src->notify_reactor_remove(this); - - this->source_v_.erase(ix); - - return true; - } - - return false; - } /*remove_source*/ - - void - PollingReactor::notify_source_primed(bp) { - /* nothing to do here -- all sources always checked by polling loop */ - } /*notify_source_primed*/ - - int64_t - PollingReactor::find_nonempty_source(size_t start_ix) - { - size_t z = this->source_v_.size(); - - /* search sources [ix .. z) */ - for(size_t ix = start_ix; ix < z; ++ix) { - bp src = this->source_v_[ix]; - - if(src->is_nonempty()) - return ix; - } - - /* search source [0 .. ix) */ - for(size_t ix = 0, n = std::min(start_ix, z); ix < n; ++ix) { - bp src = this->source_v_[ix]; - - if(src->is_nonempty()) - return ix; - } - - return -1; - } /*find_nonempty_source*/ - - uint64_t - PollingReactor::run_one() - { - int64_t ix = this->find_nonempty_source(this->next_ix_); - - scope log(XO_DEBUG(this->loglevel() <= log_level::chatty)); - - log && log(xtag("self", this), xtag("src_ix", ix)); - - uint64_t retval = 0; - - if(ix >= 0) { - bp src = this->source_v_[ix]; - - log && log(xtag("src.name", src->name())); - - retval = src->deliver_one(); - } else { - retval = 0; - } - - log.end_scope(xtag("retval", retval)); - - return retval; - } /*run_one*/ - - void - PollingReactor::display(std::ostream & os) const { - os << ""; - } - - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end PollingReactor.cpp */ diff --git a/.xo-reactor/src/reactor/Reactor.cpp b/.xo-reactor/src/reactor/Reactor.cpp deleted file mode 100644 index d0803d14..00000000 --- a/.xo-reactor/src/reactor/Reactor.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* file Reactor.cpp - * - * author: Roland Conybeare, Sep 2022 - */ - -#include "Reactor.hpp" -#include "init_reactor.hpp" -#include "xo/subsys/Subsystem.hpp" - -namespace xo { - namespace reactor { - Reactor::Reactor() { - /* ensure reactor subsystem + deps initialized */ - - InitSubsys::require(); - - Subsystem::initialize_all(); - } - - std::uint64_t - Reactor::run_n(int32_t n) - { - std::uint64_t retval = 0; - - if (n == -1) { - for (;;) { - retval += this->run_one(); - } - } else { - for (int32_t i=0; irun_one(); - } - } - - return retval; - } /*run_n*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Reactor.cpp */ diff --git a/.xo-reactor/src/reactor/ReactorSource.cpp b/.xo-reactor/src/reactor/ReactorSource.cpp deleted file mode 100644 index 5f3735ea..00000000 --- a/.xo-reactor/src/reactor/ReactorSource.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* @file ReactorSource.cpp */ - -#include "ReactorSource.hpp" -#include "xo/indentlog/print/time.hpp" -#include - -namespace xo { - using xo::time::utc_nanos; - - namespace reactor { - utc_nanos - ReactorSource::online_current_tm() const - { - /* for an online source: - * .is_exhausted() must always be false; - * this implies that .sim_current_tm() should - * not be called in the first place - */ - - assert(false); - - return time::timeutil::epoch(); - } /*online_current_tm*/ - - std::uint64_t - ReactorSource::online_advance_until(utc_nanos /*tm*/, - bool /*replay_flag*/) - { - return 0; - } /*online_advance_until*/ - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Source.cpp */ diff --git a/.xo-reactor/src/reactor/Sink.cpp b/.xo-reactor/src/reactor/Sink.cpp deleted file mode 100644 index 78f281e8..00000000 --- a/.xo-reactor/src/reactor/Sink.cpp +++ /dev/null @@ -1,18 +0,0 @@ -/* @file Sink.cpp */ - -#include "Sink.hpp" -#include "xo/refcnt/Refcounted.hpp" - -namespace xo { - namespace reactor { -#ifdef NOT_USING - rp>> - TemporaryTest::realization_printer() - { - return new SinkToConsole>(); - } /*realization_printer*/ -#endif - } /*namespace reactor*/ -} /*namespace xo*/ - -/* end Sink.cpp */ diff --git a/.xo-reactor/src/reactor/init_reactor.cpp b/.xo-reactor/src/reactor/init_reactor.cpp deleted file mode 100644 index b33e4775..00000000 --- a/.xo-reactor/src/reactor/init_reactor.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* file init_reactor.cpp - * - * author: Roland Conybeare, Aug 2022 - */ - -#include "init_reactor.hpp" -#include "xo/reflect/init_reflect.hpp" - -namespace xo { - void - InitSubsys::init() - { - /* TODO: reflect reactor types */ - } /*init*/ - - InitEvidence - InitSubsys::require() - { - InitEvidence retval; - - /* subsystem dependencies for reactor/ */ - retval ^= InitSubsys::require(); - - /* reactor/'s own initialization code */ - retval ^= Subsystem::provide("reactor", &init); - - return retval; - } /*require*/ -} /*namespace xo*/ - -/* end init_reactor.cpp */ diff --git a/.xo-reactor/utest/CMakeLists.txt b/.xo-reactor/utest/CMakeLists.txt deleted file mode 100644 index e621b6ca..00000000 --- a/.xo-reactor/utest/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -# build unittest reactor/unittest' - -set(SELF_EXE utest.reactor) -set(SELF_SRCS Sink.test.cpp PollingReactor.test.cpp reactor_utest_main.cpp) - -xo_add_utest_executable(${SELF_EXE} ${SELF_SRCS}) -xo_self_dependency(${SELF_EXE} reactor) -xo_dependency(${SELF_EXE} randomgen) -xo_external_target_dependency(${SELF_EXE} Catch2 Catch2::Catch2) - -# end CMakeLists.txt diff --git a/.xo-reactor/utest/PollingReactor.test.cpp b/.xo-reactor/utest/PollingReactor.test.cpp deleted file mode 100644 index bcdf861f..00000000 --- a/.xo-reactor/utest/PollingReactor.test.cpp +++ /dev/null @@ -1,236 +0,0 @@ -/* @file PollingReactor.test.cpp */ - -#include "xo/reactor/init_reactor.hpp" -#include "xo/reactor/PollingReactor.hpp" -#include "xo/reactor/FifoQueue.hpp" -#include "xo/reactor/Sink.hpp" -#include "xo/randomgen/xoshiro256.hpp" -#include "xo/indentlog/print/pair.hpp" -#include "catch2/catch.hpp" - -namespace xo { - //using xo::reactor::Reactor; - using xo::reactor::PollingReactor; - using xo::reactor::FifoQueue; - using xo::reactor::SinkToFunction; - using xo::time::timeutil; - using xo::time::seconds; - using xo::time::utc_nanos; - -/* note: trivial REQUIRE() call in else branch bc we still want - * catch2 to count assertions when verification succeeds - */ -# define REQUIRE_ORCAPTURE(ok_flag, catch_flag, expr) \ - if (catch_flag) { \ - REQUIRE((expr)); \ - } else { \ - REQUIRE(true); \ - ok_flag &= (expr); \ - } - -# define REQUIRE_ORFAIL(ok_flag, catch_flag, expr) \ - REQUIRE_ORCAPTURE(ok_flag, catch_flag, expr); \ - if (!ok_flag) \ - return ok_flag - - namespace { - using TestEvent = std::pair; - using TestQueue = FifoQueue; - - struct RandomTestData { - RandomTestData(std::size_t n, - xo::rng::xoshiro256ss * p_rgen); - - std::uint32_t size() const { return u1v_.size(); } - std::vector const & u1v() const { return u1v_; } - - private: - /* a set of n randomly chosen elements drawn from [0 .. 2n-1] */ - std::vector u1v_; - }; - - RandomTestData::RandomTestData(std::size_t n, - xo::rng::xoshiro256ss * p_rgen) - : u1v_(n) - { - std::shuffle(u1v_.begin(), u1v_.end(), *p_rgen); - } - } /*namespace*/ - - static InitEvidence s_evidence = InitSubsys::require(); - - namespace ut { - TEST_CASE("polling0", "[reactor]") { - Subsystem::initialize_all(); - - rp reactor = PollingReactor::make(); - - REQUIRE(reactor.get()); - - for (std::uint32_t i=0; i<3; ++i) { - INFO(xtag("i", i)); - REQUIRE(reactor->run_one() == 0); - } - } /*TEST_CASE(polling0)*/ - - /* return true=success, false=fail */ - bool - run_polling1_test(std::size_t n, - bool catch_flag, - xo::rng::xoshiro256ss * p_rgen) - { - scope log(XO_DEBUG(catch_flag)); - log && log(xtag("n", n)); - - bool ok_flag = true; - - rp reactor = PollingReactor::make(); - REQUIRE_ORFAIL(ok_flag, catch_flag, reactor.get() != nullptr); - - if (ok_flag) - reactor->set_loglevel(catch_flag - ? log_level::always - : log_level::error); - - rp q = TestQueue::make(); - REQUIRE_ORFAIL(ok_flag, catch_flag, q.get() != nullptr); - - if (ok_flag) - q->set_name("fifo"); - - /* capture delivered events */ - std::vector out_ev_v; - - auto sink_fn - = ([&out_ev_v](TestEvent const & x) { out_ev_v.push_back(x); }); - - q->add_callback(new SinkToFunction - >(sink_fn)); - - - reactor->add_source(q); - - /* max #of consecutive inserts */ - std::size_t max_enq = std::max(1UL, n/3); - /* max #of consecutive removes */ - std::size_t max_deq = std::max(1UL, n/3); - - RandomTestData seq(n, p_rgen); - - q->set_debug_sim_flag(catch_flag); - - /* verify: - * 1. queue conservation -- everything inserted gets delivered - * 2. events consumed in the same order they where inserted - * 3. no problem with queue being sometimes empty - */ - - utc_nanos t0 = timeutil::ymd_hms(20231011 /*ymd*/, 131300 /*hms*/); - - /* count #of events delivered by reactor */ - std::size_t n_delivered = 0; - - std::size_t i = 0; - while ((i < seq.u1v().size()) || (n_delivered < n)) { - /* sum of (#of enq, #of deq) attempted for this iteration */ - std::size_t n_work_attempted = 0; - /* sum of (#of enq, #of deq) accomplished for this iteration */ - std::size_t n_work_done = 0; - std::size_t n_enq = p_rgen->generate() % (max_enq + 1); - std::size_t n_deq_attempted = 1 + (p_rgen->generate() % (max_deq + 1)); - std::size_t n_deq_done = 0; - - /* pick random #of elements to insert (to back of queue) */ - { - for (std::size_t j = 0; (j < n_enq) && (i < seq.u1v().size()); ++j) { - utc_nanos ti = t0 + seconds(i); - - q->notify_ev(std::make_pair(ti, seq.u1v()[i++])); - } - - n_work_attempted += n_enq; - n_work_done += n_enq; - } - - /* pick random #of elements to remove (from front of queue) */ - { - n_deq_done += reactor->run_n(n_deq_attempted); - - n_work_attempted += n_deq_attempted; - n_work_done += n_deq_done; - n_delivered += n_deq_done; - } - - log && log(xtag("i", i), - xtag("n", n), - xtag("n_work_attempted", n_work_attempted), - xtag("n_work_done", n_work_done), - xtag("n_enq", n_enq), - xtag("n_deq_attempted", n_deq_attempted), - xtag("n_deq_done", n_deq_done)); - - if ((i == seq.u1v().size()) /*no more enqueues planned*/ - && (n_work_attempted > 0) - && (n_work_done == 0)) - { - /* expect incremental progress every iteration; - * want unit test to always terminate - */ - break; - } - } - - REQUIRE_ORFAIL(ok_flag, catch_flag, i == n); - REQUIRE_ORFAIL(ok_flag, catch_flag, n_delivered == n); - - /* check events delivered 1:1 and in order */ - for (std::size_t i=0; i seed; - auto rgen = xo::rng::xoshiro256ss(seed); - - for (std::size_t n = 4; n <= 1024; n *= 2) { - bool ok_flag = false; - - for (std::uint32_t attention = 0; !ok_flag && (attention < 2); ++attention) { - ok_flag = true; - - /* attention=0: - * - no logging - * - detect assertion failures, but don't report them to catch - * attention=1: - * - only runs if failure detected with attention=0 - * - full logging - * - report to catch - */ - - bool debug_flag = (attention == 1); - - ok_flag &= run_polling1_test(n, debug_flag, &rgen); - } - } - - } /*TEST_CASE(polling1)*/ - } /*namespace ut*/ - -} /*namespace xo*/ - - -/* end PollingReactor.test.cpp */ diff --git a/.xo-reactor/utest/Sink.test.cpp b/.xo-reactor/utest/Sink.test.cpp deleted file mode 100644 index 524b821e..00000000 --- a/.xo-reactor/utest/Sink.test.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/* @file Sink.test.cpp */ - -#include "xo/reactor/PollingReactor.hpp" -#include "xo/reactor/Sink.hpp" -#include "xo/indentlog/print/pair.hpp" -#include "catch2/catch.hpp" - -namespace xo { - using xo::reactor::Reactor; - using xo::reactor::PollingReactor; - using xo::reactor::AbstractSink; - using xo::reactor::Sink1; - using xo::reactor::SinkEndpoint; - using xo::reactor::SinkToConsole; - using xo::time::utc_nanos; - - namespace { - class TestSink : public SinkEndpoint { - public: - TestSink() = default; - - virtual uint32_t n_in_ev() const override { return 0; } - virtual bool allow_volatile_source() const override { return true; } - virtual void notify_ev(int const & ev) override {} - virtual void display(std::ostream & os) const override { os << ""; } - }; /*TestSink*/ - - class TestSink2 : public SinkEndpoint { - public: - TestSink2() = default; - - virtual uint32_t n_in_ev() const override { return 0; } - virtual bool allow_volatile_source() const override { return true; } - virtual void notify_ev(utc_nanos const & ev) override {} - virtual void display(std::ostream & os) const override { os << ""; } - }; /*TestSink2*/ - - using TestSink3 = SinkToConsole>; - } /*namespace*/ - - namespace ut { - TEST_CASE("sink-cast", "[reactor][sink]") { - rp test_sink = new TestSink(); - rp sink = test_sink; - - TestSink * cast_sink = dynamic_cast(sink.get()); - - REQUIRE(test_sink.get() == cast_sink); - - Sink1 * int_sink = dynamic_cast *>(sink.get()); - - REQUIRE(test_sink.get() == int_sink); - - rp> int_sink2 - = Sink1::require_native("TEST_CASE(sink-cast)", sink.get()); - - REQUIRE(test_sink.get() == int_sink2.get()); - } /*TEST_CASE(sink-cast)*/ - - TEST_CASE("sink-cast2", "[reactor]") { - rp test_sink = new TestSink2(); - rp sink = test_sink; - - TestSink2 * cast_sink = dynamic_cast(sink.get()); - - REQUIRE(test_sink.get() == cast_sink); - - Sink1 * dt_sink = dynamic_cast *>(sink.get()); - - REQUIRE(test_sink.get() == dt_sink); - - rp> dt_sink2 - = Sink1::require_native("TEST_CASE(sink-cast2)", sink.get()); - - REQUIRE(test_sink.get() == dt_sink2.get()); - } /*TEST_CASE(sink-cast2)*/ - - TEST_CASE("sink-cast3", "[reactor]") { - rp test_sink = new TestSink3(); - rp sink = test_sink; - - TestSink3 * cast_sink = dynamic_cast(sink.get()); - - REQUIRE(test_sink.get() == cast_sink); - - Sink1> * ev_sink - = dynamic_cast> *>(sink.get()); - - REQUIRE(test_sink.get() == ev_sink); - - rp>> ev_sink2 - = Sink1>::require_native("TEST_CASE(sink-cast3)", sink.get()); - - REQUIRE(test_sink.get() == ev_sink2.get()); - } /*TEST_CASE(sink-cast3)*/ - } /*namespace ut*/ -} /*namespace xo*/ - -/* end Sink.test.cpp */ diff --git a/.xo-reactor/utest/reactor_utest_main.cpp b/.xo-reactor/utest/reactor_utest_main.cpp deleted file mode 100644 index c3b80295..00000000 --- a/.xo-reactor/utest/reactor_utest_main.cpp +++ /dev/null @@ -1,6 +0,0 @@ -/* @file reactor_utest_main.cpp */ - -#define CATCH_CONFIG_MAIN -#include "catch2/catch.hpp" - -/* end reactor_utest_main.cpp */