Add 'xo-reactor/' from commit 'e3a53d10e6'
git-subtree-dir: xo-reactor git-subtree-mainline:dd1a6b1afcgit-subtree-split:e3a53d10e6
This commit is contained in:
commit
28b6d457c0
38 changed files with 3336 additions and 0 deletions
231
xo-reactor/.github/workflows/main.yml
vendored
Normal file
231
xo-reactor/.github/workflows/main.yml
vendored
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
name: build xo-reactor + xo dependencies
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "main" ]
|
||||
pull_request:
|
||||
branches: [ "main" ]
|
||||
|
||||
env:
|
||||
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
|
||||
BUILD_TYPE: Release
|
||||
|
||||
jobs:
|
||||
build:
|
||||
# The CMake configure and build commands are platform agnostic and should work equally well on Windows or Mac.
|
||||
# You can convert this to a matrix build if you need cross-platform coverage.
|
||||
# See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: checkout source
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Install catch2
|
||||
# install catch2. see [[https://stackoverflow.com/questions/57982945/how-to-apt-get-install-in-a-github-actions-workflow]]
|
||||
run: sudo apt-get install -y catch2
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone xo-cmake
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/xo-cmake
|
||||
path: repo/xo-cmake
|
||||
|
||||
- name: Configure xo-cmake
|
||||
run: cmake -B ${{github.workspace}}/build_xo-cmake -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/xo-cmake
|
||||
|
||||
- name: Build xo-cmake (trivial)
|
||||
run: cmake --build ${{github.workspace}}/build_xo-cmake --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install xo-cmake
|
||||
run: cmake --install ${{github.workspace}}/build_xo-cmake
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone indentlog
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/indentlog
|
||||
path: repo/indentlog
|
||||
|
||||
- name: Configure indentlog
|
||||
# configure cmake for indentlog in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_indentlog -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/indentlog
|
||||
|
||||
- name: Build indentlog
|
||||
run: cmake --build ${{github.workspace}}/build_indentlog --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install indentlog
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_indentlog
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone refcnt
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/refcnt
|
||||
path: repo/refcnt
|
||||
|
||||
- name: Configure refcnt
|
||||
# configure cmake for refcnt in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_refcnt -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/refcnt
|
||||
|
||||
- name: Build refcnt
|
||||
run: cmake --build ${{github.workspace}}/build_refcnt --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install refcnt
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_refcnt
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone subsys
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/subsys
|
||||
path: repo/subsys
|
||||
|
||||
- name: Configure subsys
|
||||
# configure cmake for subsys in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_subsys -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/subsys
|
||||
|
||||
- name: Build subsys
|
||||
run: cmake --build ${{github.workspace}}/build_subsys --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install subsys
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_subsys
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone reflect
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/reflect
|
||||
path: repo/reflect
|
||||
|
||||
- name: Configure reflect
|
||||
# configure cmake for reflect in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_reflect -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/reflect
|
||||
|
||||
- name: Build reflect
|
||||
run: cmake --build ${{github.workspace}}/build_reflect --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install reflect
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_reflect
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone callback
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/xo-callback
|
||||
path: repo/callback
|
||||
|
||||
- name: Configure callback
|
||||
# configure cmake for callback in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_callback -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/callback
|
||||
|
||||
- name: Build callback
|
||||
run: cmake --build ${{github.workspace}}/build_callback --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install callback
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_callback
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone webutil
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/xo-webutil
|
||||
path: repo/webutil
|
||||
|
||||
- name: Configure webutil
|
||||
# configure cmake for webutil in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_webutil -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/webutil
|
||||
|
||||
- name: Build webutil
|
||||
run: cmake --build ${{github.workspace}}/build_webutil --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install webutil
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_webutil
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone printjson
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/xo-printjson
|
||||
path: repo/printjson
|
||||
|
||||
- name: Configure printjson
|
||||
# configure cmake for printjson in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_printjson -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/printjson
|
||||
|
||||
- name: Build printjson
|
||||
run: cmake --build ${{github.workspace}}/build_printjson --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install printjson
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_printjson
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone randomgen
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/randomgen
|
||||
path: repo/randomgen
|
||||
|
||||
- name: Configure randomgen
|
||||
# configure cmake for randomgen in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_randomgen -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/randomgen
|
||||
|
||||
- name: Build randomgen
|
||||
run: cmake --build ${{github.workspace}}/build_randomgen --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install randomgen
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_randomgen
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Clone ordinaltree
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
repository: Rconybea/xo-ordinaltree
|
||||
path: repo/ordinaltree
|
||||
|
||||
- name: Configure ordinaltree
|
||||
# configure cmake for ordinaltree in dedicated build directory.
|
||||
run: cmake -B ${{github.workspace}}/build_ordinaltree -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local repo/ordinaltree
|
||||
|
||||
- name: Build ordinaltree
|
||||
run: cmake --build ${{github.workspace}}/build_ordinaltree --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Install ordinaltree
|
||||
# install into ${{github.workspace}}/local
|
||||
run: cmake --install ${{github.workspace}}/build_ordinaltree
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
- name: Configure self (reactor)
|
||||
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
|
||||
# See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type
|
||||
run: cmake -B ${{github.workspace}}/build_reactor -DCMAKE_MODULE_PATH=${{github.workspace}}/local/share/cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Build self (reactor)
|
||||
# Build your program with the given configuration
|
||||
run: cmake --build ${{github.workspace}}/build_reactor --config ${{env.BUILD_TYPE}}
|
||||
|
||||
- name: Test self (reactor)
|
||||
working-directory: ${{github.workspace}}/build_reactor
|
||||
# Execute tests defined by the CMake configuration.
|
||||
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
|
||||
run: ctest -C ${{env.BUILD_TYPE}}
|
||||
6
xo-reactor/.gitignore
vendored
Normal file
6
xo-reactor/.gitignore
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
# lsp keep state here
|
||||
.cache
|
||||
# typical build directories
|
||||
.build*
|
||||
# for lsp: manual symlink to chosen build directory
|
||||
compile_commands.json
|
||||
30
xo-reactor/CMakeLists.txt
Normal file
30
xo-reactor/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
# xo-reactor/CMakeLists.txt
|
||||
|
||||
cmake_minimum_required(VERSION 3.10)
|
||||
|
||||
project(reactor VERSION 0.1)
|
||||
|
||||
include(GNUInstallDirs)
|
||||
include(cmake/xo-bootstrap-macros.cmake)
|
||||
|
||||
xo_cxx_toplevel_options3()
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# c++ settings
|
||||
|
||||
# one-time project-specific c++ flags. usually empty
|
||||
set(PROJECT_CXX_FLAGS "")
|
||||
#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2")
|
||||
add_definitions(${PROJECT_CXX_FLAGS})
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
add_subdirectory(src/reactor)
|
||||
add_subdirectory(utest)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# provide find_package() support for reactor customers
|
||||
|
||||
xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets)
|
||||
|
||||
# end CMakeLists.txt
|
||||
56
xo-reactor/README.md
Normal file
56
xo-reactor/README.md
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
# reactor library
|
||||
|
||||
in-memory queuing system
|
||||
|
||||
## Getting Started
|
||||
|
||||
### build + install dependencies
|
||||
|
||||
build+install these first
|
||||
|
||||
- xo-reflect [github.com/Rconybea/xo-reflect](https://github.com/Rconybea/reflect)
|
||||
- xo-callback [github.com/Rconybea/xo-callback](https://github.com/Rconybea/xo-callback)
|
||||
- xo-webutil [github.com/Rconybea/xo-webutil](https://github.com/Rconybea/xo-webutil)
|
||||
|
||||
### build + install xo-reactor
|
||||
```
|
||||
$ cd xo-reactor
|
||||
$ mkdir build
|
||||
$ cd build
|
||||
$ INSTALL_PREFIX=/usr/local # or wherever you prefer
|
||||
$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} ..
|
||||
$ make
|
||||
$ make install
|
||||
```
|
||||
(also see .github/workflows/main.yml)
|
||||
|
||||
## Development
|
||||
|
||||
### build for unit test coverage
|
||||
```
|
||||
$ cd xo-reactor
|
||||
$ mkdir ccov
|
||||
$ cd ccov
|
||||
$ cmake -DCMAKE_MODULE_PATH=${INSTALL_PREFIX}/share/cmake -DCMAKE_PREFIX_PATH=${INSTALL_PREFIX} -DCODE_COVERAGE=ON -DCMAKE_BUILD_TYPE=Debug ..
|
||||
```
|
||||
|
||||
### LSP support
|
||||
|
||||
LSP looks for compile commands in the root of the source tree;
|
||||
cmake creates them in the root of its build directory.
|
||||
|
||||
```
|
||||
$ cd xo-reactor
|
||||
$ ln -s build/compile_commands.json
|
||||
```
|
||||
|
||||
### display cmake variables
|
||||
|
||||
- `-L` list variables
|
||||
- `-A` include 'advanced' variables
|
||||
- `-H` include help text
|
||||
|
||||
```
|
||||
$ cd xo-reactor/build
|
||||
$ cmake -LAH
|
||||
```
|
||||
16
xo-reactor/cmake/reactorConfig.cmake.in
Normal file
16
xo-reactor/cmake/reactorConfig.cmake.in
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
@PACKAGE_INIT@
|
||||
|
||||
include(CMakeFindDependencyMacro)
|
||||
|
||||
# note: changes to find_dependency() calls here
|
||||
# must coordinate with xo_dependency() calls
|
||||
# in xo-reactor/src/reactor/CMakeLists.txt
|
||||
#
|
||||
find_dependency(xo_ordinaltree)
|
||||
find_dependency(reflect)
|
||||
find_dependency(webutil)
|
||||
find_dependency(printjson)
|
||||
find_dependency(callback)
|
||||
|
||||
include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake")
|
||||
check_required_components("@PROJECT_NAME@")
|
||||
35
xo-reactor/cmake/xo-bootstrap-macros.cmake
Normal file
35
xo-reactor/cmake/xo-bootstrap-macros.cmake
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
# ----------------------------------------------------------------
|
||||
# for example:
|
||||
# $ PREFIX=/usr/local # for example
|
||||
# $ cmake -DCMAKE_MODULE_PATH=prefix -DCMAKE_INSTALL_PREFIX=$PREFIX -B .build
|
||||
#
|
||||
# will get
|
||||
# CMAKE_MODULE_PATH
|
||||
# from xo-cmake-config --cmake-module-path
|
||||
#
|
||||
# and expect .cmake macros in
|
||||
# CMAKE_MODULE_PATH/xo_macros/xo_cxx.cmake
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
find_program(XO_CMAKE_CONFIG_EXECUTABLE NAMES xo-cmake-config REQUIRED)
|
||||
|
||||
if ("${XO_CMAKE_CONFIG_EXECUTABLE}" STREQUAL "XO_CMAKE_CONFIG_EXECUTABLE-NOT_FOUND")
|
||||
message(FATAL "could not find xo-cmake-config executable")
|
||||
endif()
|
||||
|
||||
message(STATUS "XO_CMAKE_CONFIG_EXECUTABLE=${XO_CMAKE_CONFIG_EXECUTABLE}")
|
||||
|
||||
if (NOT XO_SUBMODULE_BUILD)
|
||||
if (("${CMAKE_MODULE_PATH}" STREQUAL "") OR ("${CMAKE_MODULE_PATH}" STREQUAL prefix))
|
||||
# default to typical install location for xo-project-macros
|
||||
execute_process(COMMAND ${XO_CMAKE_CONFIG_EXECUTABLE} --cmake-module-path OUTPUT_VARIABLE CMAKE_MODULE_PATH)
|
||||
message(STATUS "CMAKE_MODULE_PATH=${CMAKE_MODULE_PATH}")
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# needs to have been installed somewhere on CMAKE_MODULE_PATH,
|
||||
# (e.g. from xo-cmake with the same value for CMAKE_INSTALL_PREFIX)
|
||||
#
|
||||
include(xo_macros/xo_cxx)
|
||||
|
||||
xo_cxx_bootstrap_message()
|
||||
50
xo-reactor/include/xo/reactor/AbstractEventProcessor.hpp
Normal file
50
xo-reactor/include/xo/reactor/AbstractEventProcessor.hpp
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
/* @file AbstractEventProcessor.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "xo/refcnt/Refcounted.hpp"
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* common base class for {AbstractSource, AbstractSink}.
|
||||
* An event processor can be:
|
||||
* 1. an event source (inherits AbstractSource)
|
||||
* 2. an event sink (inherits AbstractSink)
|
||||
* 3. both source+sink (inherits both)
|
||||
*/
|
||||
class AbstractEventProcessor : virtual public ref::Refcount {
|
||||
public:
|
||||
/* reporting name for this source. ideally unique, but not required */
|
||||
virtual std::string const & name() const = 0;
|
||||
/* set .name */
|
||||
virtual void set_name(std::string const & x) = 0;
|
||||
|
||||
/* find all event processors ep reachable from x (i.e. downstream from x).
|
||||
* report each such ep exactly once
|
||||
*/
|
||||
static std::vector<rp<AbstractEventProcessor>> map_network(rp<AbstractEventProcessor> const & x);
|
||||
|
||||
/* visit direct downstream consumers c[i] of this event processor.
|
||||
* call ep(c[i]) for each such consumer.
|
||||
*/
|
||||
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) = 0;
|
||||
|
||||
/* write representation to stream */
|
||||
virtual void display(std::ostream & os) const = 0;
|
||||
/* human-readable string identifying this source */
|
||||
virtual std::string display_string() const;
|
||||
}; /*AbstractEventProcessor*/
|
||||
|
||||
inline std::ostream &
|
||||
operator<<(std::ostream & os, AbstractEventProcessor const & src) {
|
||||
src.display(os);
|
||||
return os;
|
||||
} /*operator<<*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end AbstractEventProcessor.hpp */
|
||||
71
xo-reactor/include/xo/reactor/AbstractSink.hpp
Normal file
71
xo-reactor/include/xo/reactor/AbstractSink.hpp
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
/* @file AbstractSink.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "AbstractSource.hpp"
|
||||
#include "xo/reflect/TaggedPtr.hpp"
|
||||
#include "xo/reflect/TypeDescr.hpp"
|
||||
//#include "time/Time.hpp"
|
||||
#include "xo/indentlog/print/tag.hpp"
|
||||
#include "xo/cxxutil/demangle.hpp"
|
||||
#include <typeinfo>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* an event consumer.
|
||||
* note that event representation is not specified,
|
||||
* this helps avoid mandating a type hierarchy for events
|
||||
*/
|
||||
class AbstractSink : public virtual AbstractEventProcessor {
|
||||
public:
|
||||
using TypeDescr = reflect::TypeDescr;
|
||||
using TaggedPtr = reflect::TaggedPtr;
|
||||
|
||||
public:
|
||||
virtual ~AbstractSink() = default;
|
||||
|
||||
/* if true: sources may produce events of any reflected type.
|
||||
* sink will accept such events using .notify_ev_tp()
|
||||
* for example see web_util/WebsocketSink
|
||||
*
|
||||
* if false (common): souce is expected to to produce events of
|
||||
* a single type, specified by .sink_ev_type()
|
||||
* .notify_ev_tp() will downcast to that type.
|
||||
* for example see reactor/Sink1
|
||||
*
|
||||
* polymorphic sinks pay for runtime polymorphism
|
||||
* (since WebsocketSink sends events in json format this is
|
||||
* expected to be negligible compared to message formatting)
|
||||
*/
|
||||
virtual bool allow_polymorphic_source() const = 0;
|
||||
|
||||
/* identify datatype for items expected by this sink */
|
||||
virtual TypeDescr sink_ev_type() const = 0;
|
||||
|
||||
/* true iff this sink accepts volatile events.
|
||||
* volatile events are events that may be modified
|
||||
* or destroyed after being delivered to this sink.
|
||||
*
|
||||
* For example KalmanFilterSvc accepts volatile events,
|
||||
* but EventStore requires non-volatile events.
|
||||
*/
|
||||
virtual bool allow_volatile_source() const = 0;
|
||||
|
||||
/* counts lifetime #of incoming events for this sink */
|
||||
virtual uint32_t n_in_ev() const = 0;
|
||||
|
||||
/* attach an input source.
|
||||
* typically this means calling src.add_callback()
|
||||
* with a function thats calls a .notify_xxx() method
|
||||
* on this Sink
|
||||
*/
|
||||
virtual void attach_source(rp<AbstractSource> const & src) = 0;
|
||||
|
||||
/* accept incoming event, given by tagged pointer */
|
||||
virtual void notify_ev_tp(TaggedPtr const & ev_tp) = 0;
|
||||
}; /*AbstractSink*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end AbstractSink.hpp */
|
||||
100
xo-reactor/include/xo/reactor/AbstractSource.hpp
Normal file
100
xo-reactor/include/xo/reactor/AbstractSource.hpp
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
/* @file AbstractSource.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "AbstractEventProcessor.hpp"
|
||||
#include "xo/reflect/TypeDescr.hpp"
|
||||
#include "xo/callback/CallbackSet.hpp"
|
||||
#include "xo/refcnt/Refcounted.hpp"
|
||||
#include <string>
|
||||
|
||||
namespace xo {
|
||||
namespace web { class StreamEndpointDescr; }
|
||||
|
||||
namespace reactor {
|
||||
class AbstractSink;
|
||||
|
||||
template<typename T>
|
||||
class Sink1;
|
||||
|
||||
/* abstract api for a source of events.
|
||||
* Event representation is left open: Sources and Sinks
|
||||
* need to have compatible event representations,
|
||||
* and coordination is left to such (Source, Sink) pairs.
|
||||
*
|
||||
* See ReactorSource, for example
|
||||
*
|
||||
* Typically a Source will have one or more .add_callback()
|
||||
* methods, for listening to source events
|
||||
*/
|
||||
class AbstractSource : public virtual AbstractEventProcessor {
|
||||
public:
|
||||
using StreamEndpointDescr = web::StreamEndpointDescr;
|
||||
using TypeDescr = reflect::TypeDescr;
|
||||
using CallbackId = fn::CallbackId;
|
||||
|
||||
public:
|
||||
/* identify datatype for items delivered by this source */
|
||||
virtual TypeDescr source_ev_type() const = 0;
|
||||
|
||||
/* if true: event objects (see .source_ev_type())
|
||||
* may be overwritten between callbacks.
|
||||
* A sink that wants to capture events
|
||||
* (e.g. EventStore<>) will need to deep-copy them
|
||||
* if false: event objects are preserved between callbacks.
|
||||
*
|
||||
* A source that stores events received from elsewhere (e.g. FifoQueue)
|
||||
* is probably volatile.
|
||||
*
|
||||
* A source that remembers (in explicit memory) every event it produces
|
||||
* is not volatile
|
||||
*/
|
||||
virtual bool is_volatile() const = 0;
|
||||
|
||||
/* counts #of outbound events ready for delivery,
|
||||
* but not yet sent */
|
||||
virtual uint32_t n_queued_out_ev() const = 0;
|
||||
/* counts lifetime #of events delivered.
|
||||
* see also AbstractSink.n_in_ev
|
||||
*/
|
||||
virtual uint32_t n_out_ev() const = 0;
|
||||
|
||||
/* if true, simulator will report interaction with this source */
|
||||
virtual bool debug_sim_flag() const = 0;
|
||||
/* set .trace_sim_flag */
|
||||
virtual void set_debug_sim_flag(bool x) = 0;
|
||||
|
||||
virtual CallbackId attach_sink(rp<AbstractSink> const & sink) = 0;
|
||||
virtual void detach_sink(CallbackId id) = 0;
|
||||
|
||||
/* endpoint for a websocket subscriber;
|
||||
* subscriber delivers events produced by this source
|
||||
*/
|
||||
StreamEndpointDescr stream_endpoint_descr(std::string const & url_prefix);
|
||||
|
||||
/* typically expect events to be delivered using a reactor or simulator.
|
||||
* (for example see reactor/Reactor, simulator/Simulator);
|
||||
* reactor allocates cpu, and controls event ordering across sources
|
||||
* when there are multiple sources.
|
||||
*
|
||||
* However, also possible for user code to invoke .deliver_one() directly.
|
||||
* Beware, may get unpredictable results if attempt to do this on a source
|
||||
* that's also attached to a reactor.
|
||||
*/
|
||||
virtual std::uint64_t deliver_one() = 0;
|
||||
|
||||
/* convenience: call .deliver_one() n times, return sum of results */
|
||||
std::uint64_t deliver_n(uint64_t n);
|
||||
|
||||
/* convenience: call .deliver_one() until it returns 0
|
||||
* (beware of inexhaustible sources!)
|
||||
*/
|
||||
std::uint64_t deliver_all();
|
||||
}; /*AbstractSource*/
|
||||
|
||||
using AbstractSourcePtr = rp<AbstractSource>;
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end AbstractSource.hpp */
|
||||
18
xo-reactor/include/xo/reactor/DirectSource.hpp
Normal file
18
xo-reactor/include/xo/reactor/DirectSource.hpp
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
/* @file DirectSource.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
//#include "time/Time.hpp"
|
||||
#include "reactor/Sink.hpp"
|
||||
#include "reactor/EventSource.hpp"
|
||||
#include "reactor/HeapReducer.hpp"
|
||||
//#include "reactor/LastReducer.hpp"
|
||||
#include "reactor/Reactor.hpp"
|
||||
#include "callback/CallbackSet.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end DirectSource.hpp */
|
||||
25
xo-reactor/include/xo/reactor/DirectSourcePtr.hpp
Normal file
25
xo-reactor/include/xo/reactor/DirectSourcePtr.hpp
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
/* @file DirectSourcePtr.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "SecondarySource.hpp"
|
||||
#include "LastReducer.hpp"
|
||||
#include "EventTimeFn.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
template<typename Event>
|
||||
using DirectSource = SecondarySource<Event,
|
||||
LastReducer<Event,
|
||||
StructEventTimeFn<Event>>>;
|
||||
|
||||
/* use when Event is rp<T> for some T */
|
||||
template<typename Event>
|
||||
using DirectSourcePtr = SecondarySource<Event,
|
||||
LastReducer<Event,
|
||||
PtrEventTimeFn<Event>>>;
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end DirectSourcePtr.hpp */
|
||||
25
xo-reactor/include/xo/reactor/EventSource.hpp
Normal file
25
xo-reactor/include/xo/reactor/EventSource.hpp
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
/* @file EventSource.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "ReactorSource.hpp"
|
||||
#include "xo/callback/CallbackSet.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
template</*typename Event,*/
|
||||
typename Callback
|
||||
/*void (Callback::*member_fn)(Event const &)*/>
|
||||
class EventSource : public ReactorSource {
|
||||
public:
|
||||
using CallbackId = fn::CallbackId;
|
||||
|
||||
public:
|
||||
virtual CallbackId add_callback(rp<Callback> const & cb) = 0;
|
||||
virtual void remove_callback(CallbackId id) = 0;
|
||||
}; /*EventSource*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end EventSource.hpp */
|
||||
317
xo-reactor/include/xo/reactor/EventStore.hpp
Normal file
317
xo-reactor/include/xo/reactor/EventStore.hpp
Normal file
|
|
@ -0,0 +1,317 @@
|
|||
/* @file EventStore.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Reducer.hpp"
|
||||
#include "EventTimeFn.hpp"
|
||||
#include "Sink.hpp"
|
||||
#include "xo/webutil/HttpEndpointDescr.hpp"
|
||||
#include "xo/printjson/PrintJson.hpp"
|
||||
#include "xo/reflect/Reflect.hpp"
|
||||
#include "xo/ordinaltree/RedBlackTree.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
|
||||
/* abstract event store api */
|
||||
class AbstractEventStore : virtual public ref::Refcount {
|
||||
public:
|
||||
using PrintJson = xo::json::PrintJson;
|
||||
using TaggedPtr = xo::reflect::TaggedPtr;
|
||||
using HttpEndpointDescr = xo::web::HttpEndpointDescr;
|
||||
using Alist = xo::web::Alist;
|
||||
|
||||
public:
|
||||
/* true iff .size() == 0 */
|
||||
virtual bool empty() const = 0;
|
||||
|
||||
/* #of events currently held in this store */
|
||||
virtual std::uint32_t size() const = 0;
|
||||
|
||||
/* TODO:
|
||||
* 1. TaggedGptr = discriminated union of
|
||||
* a. TaggedRcptr (i.e. refcounted semantics)
|
||||
* b. Unique (i.e. unique_ptr semantics)
|
||||
* c. Exptr (i.e. unowned_ptr semantics)
|
||||
* d. compact (special-case -- value fits in pointer)
|
||||
* will need mpl copy/assign stuff for TaggedUnique
|
||||
* 2. provide .last_n(), .last_dt()
|
||||
*/
|
||||
|
||||
virtual void http_snapshot(rp<PrintJson> const & pjson,
|
||||
std::ostream * p_os) const = 0;
|
||||
|
||||
/* http endpoint; generates http output for this eventstore */
|
||||
virtual HttpEndpointDescr http_endpoint_descr(rp<PrintJson> const & pjson,
|
||||
std::string const & url_prefix) const {
|
||||
|
||||
/* important that lambda contains its own rp<PrintJson>;
|
||||
* reference to stack will not do
|
||||
*/
|
||||
rp<PrintJson> pjson_rp = pjson;
|
||||
|
||||
auto http_fn = ([this, pjson_rp]
|
||||
(std::string const & /*uri*/,
|
||||
Alist const & /*alist*/,
|
||||
std::ostream * p_os)
|
||||
{
|
||||
/* WARNING: race condition here,
|
||||
* given webserver runs from a separate thread
|
||||
*/
|
||||
|
||||
this->http_snapshot(pjson_rp, p_os);
|
||||
});
|
||||
|
||||
return HttpEndpointDescr(url_prefix + "/snap", http_fn);
|
||||
} /*http_endpoint_descr*/
|
||||
|
||||
virtual void clear() = 0;
|
||||
|
||||
virtual void insert_tp(TaggedPtr const & ev_tp) = 0;
|
||||
}; /*AbstractEventStore*/
|
||||
|
||||
/* in-memory storage for a set of events.
|
||||
*
|
||||
* Require:
|
||||
* - Event is null-constructible
|
||||
* - Event is copyable
|
||||
* - EventTimeFn :: Event -> utc_nanos
|
||||
*
|
||||
* inheritance
|
||||
* ref::Refcount
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::AbstractEventProcessor + req .visit_direct_consumers()
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::AbstractSink + req .sink_ev_type(), .notify_ev() etc.
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::Sink1<Event> + .attach_source(), .sink_ev_type(),
|
||||
* ^ req .notify_ev() etc
|
||||
* |
|
||||
* isa
|
||||
* |
|
||||
* reactor::SinkEndpoint<Event> + impl .visit_direct_consumers()
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::StructEventStore<Event, ..> + .last_n() .last_dt() etc.
|
||||
*/
|
||||
template<typename Event,
|
||||
typename EventTimeFn>
|
||||
class EventStoreImpl : public SinkEndpoint<Event>,
|
||||
public AbstractEventStore,
|
||||
ReducerBase<Event, EventTimeFn>
|
||||
{
|
||||
static_assert(EventTimeConcept<Event, EventTimeFn>);
|
||||
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using nanos = xo::time::nanos;
|
||||
using EventTree = xo::tree::RedBlackTree<utc_nanos, Event,
|
||||
xo::tree::OrdinalReduce<Event>>;
|
||||
using PrintJson = xo::json::PrintJson;
|
||||
using Alist = xo::web::Alist;
|
||||
using HttpEndpointDescr = xo::web::HttpEndpointDescr;
|
||||
|
||||
static rp<EventStoreImpl> make() { return new EventStoreImpl(); }
|
||||
|
||||
/* visit most recent n events in this store.
|
||||
* returns #of events actually visited
|
||||
*
|
||||
* if events visited are e1 .. en, then:
|
||||
* (1) en is the most recent recorded event
|
||||
* (.event_tm(en) is .tree.max_key())
|
||||
* (2) there are no events between e(i) and e(i+1)
|
||||
* (i.e. visit does not skip over any events)
|
||||
* (3) if v < n, then v = .size(),
|
||||
* where v is the #of events visited
|
||||
*
|
||||
* require:
|
||||
* - Fn :: (Event -> )
|
||||
*/
|
||||
template <typename Fn>
|
||||
std::uint32_t visit_last_n(std::uint32_t n, Fn && fn) const {
|
||||
std::uint32_t z = this->size();
|
||||
std::uint32_t lo = ((n >= z) ? 0 : z - n);
|
||||
|
||||
typename EventTree::const_iterator lo_ix = this->tree_.find_ith(lo);
|
||||
typename EventTree::const_iterator hi_ix = this->tree_.cend();
|
||||
|
||||
return this->visit_range(lo_ix, hi_ix, fn);
|
||||
} /*visit_last_n*/
|
||||
|
||||
/* visit suffix of events sufficient to cover interval of length dt.
|
||||
* visit events in increasing timestamp order.
|
||||
*
|
||||
* if events visited are e1 .. en, then:
|
||||
* (1) en is the most recent recorded event
|
||||
* (.event_tm(en) is .tree.max_key())
|
||||
* (2) there are no events between e(i) and e(i+1)
|
||||
* (i.e. visit does not skip over any events)
|
||||
* (3) if .event_tm(en) - .event_tm(e1) < dt,
|
||||
* then e1 is the earliest recorded event
|
||||
* (.event_tm(e1) is .tree.min_key())
|
||||
* (4) if .event_tm(en) - .event_tm(e1) > dt,
|
||||
* then (.event_tm(en) - .event_tm(e2)) < dt
|
||||
*
|
||||
* |<---------- dt ----------->|
|
||||
* ^ ^ ^
|
||||
* e1 e2 en
|
||||
*/
|
||||
template <typename Fn>
|
||||
std::uint32_t visit_last_dt(nanos dt, Fn && fn) const {
|
||||
if (tree_.empty())
|
||||
return 0;
|
||||
|
||||
/* tree not empty -> has max key */
|
||||
utc_nanos tn = this->tree_.max_key();
|
||||
utc_nanos tk = tn - dt;
|
||||
|
||||
typename EventTree::const_iterator lo_ix = this->tree_.find_glb(tk, true /*closed*/);
|
||||
typename EventTree::const_iterator hi_ix = this->tree_.end();
|
||||
|
||||
return this->visit_range(lo_ix, hi_ix, fn);
|
||||
} /*visit_last_dt*/
|
||||
|
||||
std::vector<Event> last_n(std::uint32_t n) const {
|
||||
std::vector<Event> retval;
|
||||
|
||||
auto fn = [&retval](Event const &ev) { retval.push_back(ev); };
|
||||
|
||||
this->visit_last_n(n, fn);
|
||||
|
||||
return retval;
|
||||
} /*last_n*/
|
||||
|
||||
std::vector<Event> last_dt(nanos dt) const {
|
||||
std::vector<Event> retval;
|
||||
|
||||
auto fn = [&retval](Event const &ev) { retval.push_back(ev); };
|
||||
|
||||
this->visit_last_dt(dt, fn);
|
||||
|
||||
return retval;
|
||||
} /*last_dt*/
|
||||
|
||||
void insert(Event const & ev) { this->tree_.insert(typename EventTree::value_type(this->event_tm(ev), ev)); }
|
||||
|
||||
// ----- Inherited from AbstractEventStore -----
|
||||
|
||||
virtual bool empty() const override { return tree_.empty(); }
|
||||
virtual std::uint32_t size() const override { return tree_.size(); }
|
||||
|
||||
/* write http snapshot of current state to *p_os */
|
||||
virtual void http_snapshot(rp<PrintJson> const & pjson, std::ostream * p_os) const override {
|
||||
using xo::reflect::Reflect;
|
||||
|
||||
/* visit last 100 events;
|
||||
* write them to *p_os in increasing time order
|
||||
*/
|
||||
auto ev_v = this->last_n(100);
|
||||
|
||||
pjson->print_tp(Reflect::make_tp(&ev_v), p_os);
|
||||
} /*http_snapshot*/
|
||||
|
||||
virtual void clear() override { this->tree_.clear(); }
|
||||
|
||||
virtual void insert_tp(TaggedPtr const & ev_tp) override {
|
||||
using xo::xtag;
|
||||
|
||||
Event * p_ev = ev_tp.recover_native<Event>();
|
||||
|
||||
if (p_ev) {
|
||||
this->insert(*p_ev);
|
||||
} else {
|
||||
throw std::runtime_error(tostr("StructEventStore<Event>::insert_tp"
|
||||
": unable to convert ev_tp to Event",
|
||||
xtag("ev_tp.type", ev_tp.td()->canonical_name()),
|
||||
xtag("Event", reflect::type_name<Event>())));
|
||||
}
|
||||
} /*insert_tp*/
|
||||
|
||||
// ----- Inherited from AbstractSink -----
|
||||
|
||||
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
|
||||
virtual bool allow_volatile_source() const override { return false; }
|
||||
virtual void notify_ev(Event const & ev) override {
|
||||
++(this->n_in_ev_);
|
||||
this->insert(ev);
|
||||
}
|
||||
|
||||
// ----- Inherited from AbstractSource -----
|
||||
|
||||
virtual void display(std::ostream & os) const override {
|
||||
using xo::xtag;
|
||||
|
||||
os << "<EventStoreImpl"
|
||||
<< xtag("name", this->name())
|
||||
<< xtag("n_in_ev", this->n_in_ev())
|
||||
<< ">";
|
||||
} /*display*/
|
||||
|
||||
// ----- Inherited from AbstractEventProcessor -----
|
||||
|
||||
virtual std::string const & name() const override { return name_; }
|
||||
virtual void set_name(std::string const & x) override { name_ = x; }
|
||||
|
||||
private:
|
||||
EventStoreImpl() = default;
|
||||
|
||||
template <typename Fn>
|
||||
std::uint32_t visit_range(typename EventTree::const_iterator lo_ix,
|
||||
typename EventTree::const_iterator hi_ix,
|
||||
Fn && fn) const {
|
||||
std::uint32_t n = 0;
|
||||
for (; lo_ix != hi_ix; ++lo_ix, ++n) {
|
||||
fn(lo_ix->second);
|
||||
}
|
||||
|
||||
return n;
|
||||
} /*visit_range*/
|
||||
|
||||
private:
|
||||
/* reporting name for this store */
|
||||
std::string name_;
|
||||
/* fetches per-event timestamp */
|
||||
EventTimeFn event_tm_fn_;
|
||||
/* counts lifetime #of incoming events (see .notify_ev()) */
|
||||
uint32_t n_in_ev_ = 0;
|
||||
/* events stored here */
|
||||
EventTree tree_;
|
||||
}; /*EventStoreImpl*/
|
||||
|
||||
template<typename Event>
|
||||
using StructEventStore = EventStoreImpl<Event, StructEventTimeFn<Event>>;
|
||||
|
||||
template<typename Event>
|
||||
using PtrEventStore = EventStoreImpl<Event, PtrEventTimeFn<Event>>;
|
||||
|
||||
/* Require:
|
||||
* EventTimeConcept<T, StructEventTimeFn<T>>
|
||||
*/
|
||||
template <typename T>
|
||||
class SinkToEventStore : public SinkEndpoint<T> {
|
||||
public:
|
||||
using EventStore = StructEventStore<T>;
|
||||
|
||||
public:
|
||||
SinkToEventStore() = default;
|
||||
|
||||
virtual void notify_ev(T const & ev) override {
|
||||
store_.insert(ev);
|
||||
} /*notify_ev*/
|
||||
|
||||
private:
|
||||
/* stash remembered events (all of them!) here */
|
||||
EventStore store_;
|
||||
}; /*SinkToEventStore*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end EventStore.hpp */
|
||||
48
xo-reactor/include/xo/reactor/EventTimeFn.hpp
Normal file
48
xo-reactor/include/xo/reactor/EventTimeFn.hpp
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
/* @file EventTimeFn.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
//#include "time/Time.hpp"
|
||||
#include "xo/indentlog/timeutil/timeutil.hpp"
|
||||
#include <concepts>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
template <typename Event, typename EventTimeFn>
|
||||
concept EventTimeConcept = requires(EventTimeFn etfn, Event ev) {
|
||||
{ etfn(ev) } -> std::same_as<xo::time::utc_nanos>;
|
||||
};
|
||||
|
||||
template<typename Event>
|
||||
class StructEventTimeFn {
|
||||
public:
|
||||
using event_t = Event;
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
utc_nanos operator()(Event const & ev) const { return ev.tm(); }
|
||||
}; /*StructEventTimeFn*/
|
||||
|
||||
template<typename Event>
|
||||
class PtrEventTimeFn {
|
||||
public:
|
||||
using event_t = Event;
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
utc_nanos operator()(Event const & ev) const { return ev->tm(); }
|
||||
}; /*PtrEventTimeFn*/
|
||||
|
||||
template<typename T>
|
||||
class PairEventTimeFn {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using event_t = std::pair<utc_nanos, T>;
|
||||
|
||||
public:
|
||||
utc_nanos operator()(event_t const & ev) const { return ev.first; }
|
||||
}; /*PairEventTimeFn*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end EventTimeFn.hpp */
|
||||
60
xo-reactor/include/xo/reactor/EventTimeFn2.hpp
Normal file
60
xo-reactor/include/xo/reactor/EventTimeFn2.hpp
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
/* @file EventTimeFn2.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "xo/refcnt/Refcounted.hpp"
|
||||
#include <timeutil/timeutil.hpp>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
template <typename Event>
|
||||
class EventTimeFn {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using event_t = Event;
|
||||
|
||||
public:
|
||||
static utc_nanos event_tm(event_t const & ev) { return ev.tm(); }
|
||||
|
||||
utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); }
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class EventTimeFn<xo::rp<T>> {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using event_t = xo::rp<T>;
|
||||
|
||||
public:
|
||||
static utc_nanos event_tm(event_t const & ev) { return ev->tm(); }
|
||||
|
||||
utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); }
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class EventTimeFn<T*> {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using event_t = T*;
|
||||
|
||||
public:
|
||||
static utc_nanos event_tm(event_t ev) { return ev->tm(); }
|
||||
|
||||
utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); }
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class EventTimeFn<std::pair<xo::time::utc_nanos, T>> {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
using event_t = std::pair<xo::time::utc_nanos, T>;
|
||||
|
||||
public:
|
||||
static utc_nanos event_tm(event_t const & ev) { return ev.first; }
|
||||
|
||||
utc_nanos operator()(event_t const & ev) const { return EventTimeFn::event_tm(ev); }
|
||||
};
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end EventTimeFn2.hpp */
|
||||
267
xo-reactor/include/xo/reactor/FifoQueue.hpp
Normal file
267
xo-reactor/include/xo/reactor/FifoQueue.hpp
Normal file
|
|
@ -0,0 +1,267 @@
|
|||
/* @file FifoQueue.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Reactor.hpp"
|
||||
#include "EventSource.hpp"
|
||||
#include "Sink.hpp"
|
||||
#include "EventTimeFn2.hpp"
|
||||
#include "xo/callback/CallbackSet.hpp"
|
||||
#include <deque>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* require:
|
||||
* T null constructible
|
||||
* T movable
|
||||
*
|
||||
* T satisfies EventTimeConcept
|
||||
*/
|
||||
template <typename T, typename EvTimeFn = EventTimeFn<T>>
|
||||
class FifoQueue : public virtual Sink1<T>, public virtual EventSource<Sink1<T>> {
|
||||
public:
|
||||
using EventSink = Sink1<T>;
|
||||
template<typename Fn>
|
||||
using RpCallbackSet = xo::fn::RpCallbackSet<Fn>;
|
||||
using CallbackId = xo::fn::CallbackId;
|
||||
using Reflect = xo::reflect::Reflect;
|
||||
using TypeDescr = xo::reflect::TypeDescr;
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
static rp<FifoQueue> make(EvTimeFn evtm_fn = EvTimeFn()) { return new FifoQueue(evtm_fn); }
|
||||
|
||||
// ----- inherited from Sink1<T> -----
|
||||
|
||||
virtual void notify_ev(T const & ev) override {
|
||||
bool is_priming = this->elt_q_.empty();
|
||||
|
||||
this->elt_q_.push_back(ev);
|
||||
|
||||
++(this->n_in_ev_);
|
||||
|
||||
if (this->upstream_exhausted_) {
|
||||
throw std::runtime_error("FifoQueue::notify_ev"
|
||||
": not allowed after upstream exhausted");
|
||||
}
|
||||
|
||||
utc_nanos tm = evtm_fn_(ev);
|
||||
|
||||
if (this->current_tm_ < tm)
|
||||
this->current_tm_ = tm;
|
||||
|
||||
Reactor * reactor = this->parent_reactor_;
|
||||
|
||||
scope log(XO_DEBUG(this->debug_sim_flag_),
|
||||
xtag("name", name_),
|
||||
xtag("reactor", (void*)reactor),
|
||||
xtag("is_priming", is_priming));
|
||||
|
||||
if (reactor) {
|
||||
if (is_priming) {
|
||||
/* reactor/simulator takes delivery/sequencing responsibility from here */
|
||||
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
|
||||
}
|
||||
} else {
|
||||
/* if no reactor, deliver immediately */
|
||||
this->deliver_one();
|
||||
}
|
||||
} /*notify_ev*/
|
||||
|
||||
// ----- inherited from AbstractSink -----
|
||||
|
||||
/* we don't care about volatile sources -- fifo queue copies incoming events */
|
||||
virtual bool allow_volatile_source() const override { return true; }
|
||||
|
||||
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
|
||||
|
||||
// ----- inherited from ReactorSource -----
|
||||
|
||||
virtual bool is_empty() const override { return elt_q_.empty(); }
|
||||
virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); }
|
||||
|
||||
virtual utc_nanos sim_current_tm() const override {
|
||||
if (this->elt_q_.empty()) {
|
||||
/* (in practice control never comes here)
|
||||
*
|
||||
* queue doesn't know time of next event yet;
|
||||
* new events may appear at any time by way of .notify_event()
|
||||
*
|
||||
* if queue doesn't know next event, can't use .sim_current_tm
|
||||
* to establish priority relative to other sources.
|
||||
* In that case rely instead on priming mechanism;
|
||||
* priming mechanism implies control should never come here
|
||||
*/
|
||||
return this->current_tm_;
|
||||
} else {
|
||||
return evtm_fn_(this->elt_q_.front());
|
||||
}
|
||||
} /*sim_current_tm*/
|
||||
|
||||
virtual uint64_t deliver_one() override {
|
||||
return this->deliver_one_aux(true /*replay_flag*/);
|
||||
} /*deliver_one*/
|
||||
|
||||
virtual uint64_t sim_advance_until(utc_nanos target_tm,
|
||||
bool replay_flag) override {
|
||||
uint64_t retval = 0;
|
||||
|
||||
while (!this->elt_q_.empty()) {
|
||||
utc_nanos tm = evtm_fn_(this->elt_q_.front());
|
||||
|
||||
if (tm < target_tm) {
|
||||
retval += this->deliver_one_aux(replay_flag);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*sim_advance_until*/
|
||||
|
||||
virtual void notify_reactor_add(Reactor * reactor) override {
|
||||
assert(!this->parent_reactor_);
|
||||
|
||||
this->parent_reactor_ = reactor;
|
||||
} /*notify_reactor_add*/
|
||||
|
||||
virtual void notify_reactor_remove(Reactor *) override {
|
||||
this->parent_reactor_ = nullptr;
|
||||
}
|
||||
|
||||
// ----- inherited from AbstractSource -----
|
||||
|
||||
virtual TypeDescr source_ev_type() const override { return Reflect::require<T>(); }
|
||||
/* events must be copied objects owned by FifoQueue.
|
||||
* not expected to be pointers to shared storage or something
|
||||
*/
|
||||
virtual bool is_volatile() const override { return false; }
|
||||
virtual uint32_t n_queued_out_ev() const override { return elt_q_.size(); }
|
||||
virtual uint32_t n_out_ev() const override { return n_out_ev_; }
|
||||
virtual bool debug_sim_flag() const override { return debug_sim_flag_; }
|
||||
virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; }
|
||||
|
||||
virtual CallbackId attach_sink(rp<AbstractSink> const & sink) override {
|
||||
rp<EventSink> native_sink
|
||||
= EventSink::require_native("FifoQueue::attach_sink", sink);
|
||||
|
||||
if (native_sink) {
|
||||
if (!this->is_volatile()
|
||||
|| native_sink->allow_volatile_source())
|
||||
{
|
||||
return this->add_callback(native_sink);
|
||||
} else {
|
||||
throw std::runtime_error("FifoQueue::attach_sink"
|
||||
": sink requires non-volatile source "
|
||||
+ std::string(reflect::type_name<T>()));
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("FifoQueue::attach_sink"
|
||||
": expected sink accepting "
|
||||
+ std::string(reflect::type_name<T>()));
|
||||
}
|
||||
} /*attach_sink*/
|
||||
|
||||
virtual void detach_sink(CallbackId id) override {
|
||||
this->remove_callback(id);
|
||||
}
|
||||
|
||||
// ----- inherited from EventSource -----
|
||||
|
||||
virtual CallbackId add_callback(rp<EventSink> const & cb) override {
|
||||
return this->cb_set_.add_callback(cb);
|
||||
}
|
||||
|
||||
virtual void remove_callback(CallbackId id) override {
|
||||
this->cb_set_.remove_callback(id);
|
||||
}
|
||||
|
||||
// ----- inherited from AbstractEventProcessor -----
|
||||
|
||||
virtual std::string const & name() const override { return name_; }
|
||||
virtual void set_name(std::string const & x) override { this->name_ = x; }
|
||||
|
||||
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
|
||||
for (auto x : this->cb_set_)
|
||||
fn(x.fn_.borrow());
|
||||
} /*visit_direct_consumers*/
|
||||
|
||||
/* write human-readable representation to stream */
|
||||
virtual void display(std::ostream & os) const override {
|
||||
os << "<FifoQueue"
|
||||
<< xtag("name", name_)
|
||||
<< xtag("addr", (void *)this)
|
||||
<< xtag("T", reflect::type_name<T>())
|
||||
<< ">";
|
||||
} /*display*/
|
||||
|
||||
private:
|
||||
FifoQueue(EvTimeFn evtm_fn) : evtm_fn_{std::move(evtm_fn)} {}
|
||||
|
||||
uint64_t deliver_one_aux(bool replay_flag) {
|
||||
scope log(XO_DEBUG(this->debug_sim_flag_),
|
||||
xtag("name", this->name_),
|
||||
xtag("elt_q.size", this->elt_q_.size()),
|
||||
xtag("replay_flag", replay_flag));
|
||||
|
||||
if (this->elt_q_.empty())
|
||||
return 0;
|
||||
|
||||
/* avoiding copy for efficiently-swappable T */
|
||||
T ev;
|
||||
std::swap(ev, this->elt_q_.front());
|
||||
|
||||
this->elt_q_.pop_front();
|
||||
|
||||
if (replay_flag) {
|
||||
log && log(xtag("deliver-ev", ev),
|
||||
xtag("elt_q.size", this->elt_q_.size()));
|
||||
|
||||
++(this->n_out_ev_);
|
||||
this->cb_set_.invoke(&EventSink::notify_ev, ev);
|
||||
}
|
||||
|
||||
return 1;
|
||||
} /*deliver_one_aux*/
|
||||
|
||||
private:
|
||||
/* name (ideally unique) for this queue */
|
||||
std::string name_;
|
||||
|
||||
/* extract timestamp from an event */
|
||||
EvTimeFn evtm_fn_;
|
||||
|
||||
/* if true, simulator/reactor will report interaction with this source */
|
||||
bool debug_sim_flag_ = false;
|
||||
|
||||
/* largest event timestamp delivered
|
||||
* (monotonically increases, event if events received out-of-timestamp-order)
|
||||
*/
|
||||
utc_nanos current_tm_;
|
||||
|
||||
/* events waiting for delivery */
|
||||
std::deque<T> elt_q_;
|
||||
|
||||
/* lifetime #of events received */
|
||||
uint32_t n_in_ev_ = 0;
|
||||
/* lifetime #of events delivered */
|
||||
uint32_t n_out_ev_ = 0;
|
||||
|
||||
/* set to true, once, to announce that upstream will send no more events.
|
||||
* see .notify_upstream_exhausted() ?
|
||||
*/
|
||||
bool upstream_exhausted_ = false;
|
||||
|
||||
/* reactor/simulator being used to schedule event consumption.
|
||||
* if omitted, borrow calling thread
|
||||
*/
|
||||
Reactor * parent_reactor_ = nullptr;
|
||||
|
||||
/* invoke callbacks in this set to deliver queued events */
|
||||
RpCallbackSet<EventSink> cb_set_;
|
||||
|
||||
}; /*FifoQueue*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end FifoQueue.hpp */
|
||||
72
xo-reactor/include/xo/reactor/HeapReducer.hpp
Normal file
72
xo-reactor/include/xo/reactor/HeapReducer.hpp
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
/* @file HeapReducer.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Reducer.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* collect incoming events in a heap,
|
||||
* ordered by timestamp.
|
||||
* output events in increasing timestamp order.
|
||||
* Information preserving in all other respects
|
||||
*
|
||||
* Require:
|
||||
* - Event is null-constructible
|
||||
* - Event is copyable
|
||||
* - EventTimeFn :: Event -> utc_nanos
|
||||
*/
|
||||
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
|
||||
class HeapReducer : public ReducerBase<Event, EventTimeFn> {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
public:
|
||||
HeapReducer() = default;
|
||||
HeapReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
|
||||
|
||||
bool is_empty() const { return this->event_heap_.empty(); }
|
||||
/* require: .is_empty() = false */
|
||||
utc_nanos next_tm() const { return this->event_tm(this->event_heap_.front()); }
|
||||
/* #of events stored in this reducer */
|
||||
uint32_t n_event() const { return this->event_heap_.size(); }
|
||||
|
||||
Event const & last_annexed_ev() const { return this->annexed_ev_; }
|
||||
|
||||
void include_event(Event const & ev) {
|
||||
this->event_heap_.push_back(ev);
|
||||
std::push_heap(this->event_heap_.begin(),
|
||||
this->event_heap_.end(),
|
||||
std::greater<Event>());
|
||||
} /*include_event*/
|
||||
|
||||
void include_event(Event && ev) {
|
||||
this->event_heap_.push_back(std::move(ev));
|
||||
std::push_heap(this->event_heap_.begin(),
|
||||
this->event_heap_.end(),
|
||||
std::greater<Event>());
|
||||
} /*include_event*/
|
||||
|
||||
Event & annex_one() {
|
||||
this->annexed_ev_ = this->event_heap_.front();
|
||||
std::pop_heap(this->event_heap_.begin(),
|
||||
this->event_heap_.end(),
|
||||
std::greater<Event>());
|
||||
this->event_heap_.pop_back();
|
||||
|
||||
return this->annexed_ev_;
|
||||
} /*annex_one*/
|
||||
|
||||
// ----- Inherited from ReducerBase -----
|
||||
|
||||
// utc_nanos event_tm(Event const & x);
|
||||
|
||||
private:
|
||||
/* queued Events, in increasing timestamp order */
|
||||
std::vector<Event> event_heap_;
|
||||
/* annexed event, removed from .event_heap */
|
||||
Event annexed_ev_;
|
||||
}; /*HeapReducer*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end HeapReducer.hpp */
|
||||
154
xo-reactor/include/xo/reactor/LastReducer.hpp
Normal file
154
xo-reactor/include/xo/reactor/LastReducer.hpp
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
/* @file LastReducer.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Reducer.hpp"
|
||||
#include <array>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* implementation record used in LastReducer.
|
||||
* LastReducer (see below) remembers a single event,
|
||||
* + will be updated on successive calls to
|
||||
* LastReducer.include_event()
|
||||
*
|
||||
* need to remember the _first_ (& therefore earliest)
|
||||
* event timestamp in such a wave, since that establishes when simulator
|
||||
* should deliver the event -- even if event is subsequently
|
||||
* overwritten.
|
||||
*
|
||||
* once event is delivered, timestamp can reset
|
||||
*
|
||||
* otherwise if upstream producer sends events with
|
||||
* future timestamps, can get indefinite postponement
|
||||
* with simulation clock failing to catch up to event time.
|
||||
*
|
||||
*/
|
||||
|
||||
template<typename Event>
|
||||
class EventRecd {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
EventRecd() = default;
|
||||
EventRecd(utc_nanos tm, Event ev) : trigger_tm_{tm}, ev_{ev} {}
|
||||
EventRecd(utc_nanos tm, Event && ev) : trigger_tm_{tm}, ev_{std::move(ev)} {}
|
||||
|
||||
public:
|
||||
/* if sim, deliver event when simulation clock reaches
|
||||
* .trigger_tm; .trigger_tm can be earlier than .ev time
|
||||
*/
|
||||
utc_nanos trigger_tm_;
|
||||
/* event to deliver */
|
||||
Event ev_;
|
||||
};
|
||||
|
||||
/* reducer that just remembers the last event
|
||||
*
|
||||
* Require:
|
||||
* - Event is null-contructible
|
||||
* - Event is copyable
|
||||
*
|
||||
* LastReducer provides reentrancy support. This support doesn't operate
|
||||
* if Event copy is not deep, e.g. for Event = rp<Foo>n
|
||||
*
|
||||
* .include_event()
|
||||
* /-------\ -----------------> /------\
|
||||
* | empty | | full |
|
||||
* \-------/ <----------------- \------/
|
||||
* . .annex_one() .
|
||||
* . .
|
||||
* .is_empty()=true .is_empty()=false
|
||||
*/
|
||||
template<typename Event, typename EventTimeFn = StructEventTimeFn<Event>>
|
||||
class LastReducer : public ReducerBase<Event, EventTimeFn> {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
LastReducer() = default;
|
||||
LastReducer(EventTimeFn const & evtfn) : ReducerBase<Event, EventTimeFn>(evtfn) {}
|
||||
|
||||
bool is_empty() const { return empty_flag_; }
|
||||
/* require: .is_empty() = false */
|
||||
utc_nanos next_tm() const {
|
||||
return this->last_ev_[this->last_ix_].trigger_tm_;
|
||||
//return this->event_tm(this->last_ev_[this->last_ix_]);
|
||||
}
|
||||
/* #of events stored in this reducer (0 or 1) */
|
||||
uint32_t n_event() const { return this->empty_flag_ ? 0 : 1; }
|
||||
|
||||
Event const & last_annexed_ev() const {
|
||||
return this->last_ev_[1 - this->last_ix_].ev_;
|
||||
}
|
||||
|
||||
EventRecd<Event> & include_event_aux(Event const & ev) {
|
||||
EventRecd<Event> & evr
|
||||
= this->last_ev_[this->last_ix_];
|
||||
|
||||
if (this->empty_flag_) {
|
||||
/* evr.trigger_tm will be preserved across
|
||||
* successive calls to .include_event();
|
||||
* until .annex_one()
|
||||
*/
|
||||
evr.trigger_tm_ = this->event_tm(ev);
|
||||
|
||||
this->empty_flag_ = false;
|
||||
}
|
||||
|
||||
return evr;
|
||||
} /*include_event_aux*/
|
||||
|
||||
void include_event(Event const & ev) {
|
||||
EventRecd<Event> & evr
|
||||
= this->include_event_aux(ev);
|
||||
|
||||
evr.ev_ = ev;
|
||||
} /*include_event*/
|
||||
|
||||
void include_event(Event && ev) {
|
||||
EventRecd<Event> & evr
|
||||
= this->include_event_aux(ev);
|
||||
|
||||
evr.ev_ = std::move(ev);
|
||||
} /*include_event*/
|
||||
|
||||
Event & annex_one() {
|
||||
std::uint32_t annexed_ix = this->last_ix_;
|
||||
|
||||
/* since .empty_flag is true,
|
||||
* next call to .include_event_aux() will
|
||||
* capture new timestamp
|
||||
*/
|
||||
this->empty_flag_ = true;
|
||||
this->last_ix_ = (1 - this->last_ix_);
|
||||
|
||||
return this->last_ev_[annexed_ix].ev_;
|
||||
} /*annex_one*/
|
||||
|
||||
// ----- Inherited from ReducerBase -----
|
||||
|
||||
//utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); }
|
||||
|
||||
private:
|
||||
/* true when reducer contains 0 queued events,
|
||||
* not counting any annexed event
|
||||
*/
|
||||
bool empty_flag_ = true;
|
||||
|
||||
/* .last_ev[.last_ix] updated by .include_event()
|
||||
*/
|
||||
std::uint32_t last_ix_ = 0;
|
||||
/* remember two events
|
||||
* (a) a single queued event (updated by .include_event())
|
||||
* (b) a single removed event (reported by .annex_one())
|
||||
*
|
||||
* roles of .last_ev[0], .last_ev[1] reverse each time .annex_one() runs
|
||||
*/
|
||||
std::array<EventRecd<Event>, 2> last_ev_;
|
||||
}; /*LastReducer*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end LastReducer.hpp */
|
||||
48
xo-reactor/include/xo/reactor/PollingReactor.hpp
Normal file
48
xo-reactor/include/xo/reactor/PollingReactor.hpp
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
/* @file PollingReactor.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Reactor.hpp"
|
||||
#include "ReactorSource.hpp"
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* reactor that runs by polling an ordered set of sources */
|
||||
class PollingReactor : public Reactor {
|
||||
public:
|
||||
/* named ctor idiom */
|
||||
static rp<PollingReactor> make() { return new PollingReactor(); }
|
||||
|
||||
// ----- inherited from Reactor -----
|
||||
|
||||
virtual bool add_source(ref::brw<ReactorSource> src) override;
|
||||
virtual bool remove_source(ref::brw<ReactorSource> src) override;
|
||||
virtual void notify_source_primed(ref::brw<ReactorSource> src) override;
|
||||
virtual std::uint64_t run_one() override;
|
||||
|
||||
private:
|
||||
PollingReactor() = default;
|
||||
|
||||
/* find non-empty source, starting from .source_v_[start_ix],
|
||||
* wrapping around to .source_v_[start_ix - 1].
|
||||
*
|
||||
* return index of first available non-empty source,
|
||||
* or -1 if all sources are empty
|
||||
*/
|
||||
std::int64_t find_nonempty_source(std::size_t start_ix);
|
||||
|
||||
private:
|
||||
/* next source to poll will be .source_v_[.next_ix_] */
|
||||
std::size_t next_ix_ = 0;
|
||||
|
||||
/* ordered set of sources (see reactor::Source)
|
||||
* reactor will poll sources in round-robin order
|
||||
*/
|
||||
std::vector<ReactorSourcePtr> source_v_;
|
||||
}; /*PollingReactor*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end PollingReactor.hpp */
|
||||
92
xo-reactor/include/xo/reactor/PolyAdapterSink.hpp
Normal file
92
xo-reactor/include/xo/reactor/PolyAdapterSink.hpp
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
/* file PolyAdapterSink.hpp
|
||||
*
|
||||
* author: Roland Conybeare, Sep 2022
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Sink.hpp"
|
||||
#include "xo/reflect/Reflect.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* adapter between a source that delivers a particular event type T,
|
||||
* and a sink that accepts arbitrarily-typed events via .notify_ev_tp()
|
||||
* Use this to connect to a polymorphic sink.
|
||||
*
|
||||
* Require:
|
||||
* - .poly_sink.allow_polymorphic_source()
|
||||
* (ofc. otherwise no point in using PolyAdapterSink<T>)
|
||||
* - .poly_sink.allow_volatile_source()
|
||||
* need this bc will be wrapping event with TaggedPtr,
|
||||
* which doesn't manage event lifetime
|
||||
*/
|
||||
template<typename T>
|
||||
class PolyAdapterSink : public reactor::Sink1<T> {
|
||||
public:
|
||||
using Reflect = reflect::Reflect;
|
||||
using TaggedPtr = reflect::TaggedPtr;
|
||||
|
||||
public:
|
||||
/* named ctor idiom */
|
||||
static rp<PolyAdapterSink> make(rp<AbstractSink> poly_sink) {
|
||||
//xo::scope lscope("PolyAdapterSink::make");
|
||||
|
||||
rp<PolyAdapterSink> retval(new PolyAdapterSink(poly_sink));
|
||||
|
||||
//lscope.log("adapter", (void*)retval.get());
|
||||
|
||||
return retval;
|
||||
} /*make*/
|
||||
|
||||
// ----- Inherited from Sink1<T> -----
|
||||
|
||||
virtual void notify_ev(T const & ev) override {
|
||||
//xo::scope lscope("PolyAdapterSink::notify_ev");
|
||||
//lscope.log(xo::xtag("ev", ev));
|
||||
|
||||
TaggedPtr ev_tp = Reflect::make_tp(const_cast<T *>(&ev));
|
||||
|
||||
this->notify_ev_tp(ev_tp);
|
||||
} /*notify_ev*/
|
||||
|
||||
// ----- Inherited from AbstractSink -----
|
||||
|
||||
virtual bool allow_volatile_source() const override { return true; }
|
||||
virtual uint32_t n_in_ev() const override { return this->poly_sink_->n_in_ev(); }
|
||||
/* note: ok to do this, however if expecting to use this entry point,
|
||||
* maybe don't need to interpose PolyAdapterSink<T> ahead of .poly_sink
|
||||
*/
|
||||
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
|
||||
//xo::scope lscope("PolyAdapterSink::notify_ev_tp");
|
||||
|
||||
return this->poly_sink_->notify_ev_tp(ev_tp);
|
||||
}
|
||||
|
||||
// ----- Inherited from AbstractEventProcessor -----
|
||||
|
||||
virtual std::string const & name() const override { return this->poly_sink_->name(); }
|
||||
virtual void set_name(std::string const & x) override { this->poly_sink_->set_name(x); }
|
||||
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
|
||||
this->poly_sink_->visit_direct_consumers(fn);
|
||||
}
|
||||
virtual void display(std::ostream & os) const override {
|
||||
using xo::xtag;
|
||||
os << "<PolyAdapterSink"
|
||||
<< xtag("addr", (void*)this)
|
||||
<< xtag("T", reflect::type_name<T>())
|
||||
<< xtag("poly", this->poly_sink_)
|
||||
<< ">";
|
||||
} /*display*/
|
||||
|
||||
private:
|
||||
PolyAdapterSink(rp<AbstractSink> poly_sink) : poly_sink_{std::move(poly_sink)} {}
|
||||
|
||||
private:
|
||||
/* mandate: .poly_sink.allow_polymorphic_source() is true */
|
||||
rp<AbstractSink> poly_sink_;
|
||||
}; /*PolyAdapterSink*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end PolyAdapterSink.hpp */
|
||||
74
xo-reactor/include/xo/reactor/Reactor.hpp
Normal file
74
xo-reactor/include/xo/reactor/Reactor.hpp
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
/* @file Reactor.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "xo/refcnt/Refcounted.hpp"
|
||||
#include "xo/indentlog/log_level.hpp"
|
||||
#include <cstdint>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
class ReactorSource;
|
||||
|
||||
/* abtract api for a reactor:
|
||||
* something that arranges to have work done on a set of Sources.
|
||||
*/
|
||||
class Reactor : public ref::Refcount {
|
||||
public:
|
||||
virtual ~Reactor() = default;
|
||||
|
||||
log_level loglevel() const { return loglevel_; }
|
||||
void set_loglevel(log_level loglevel) { loglevel_ = loglevel; }
|
||||
|
||||
/* add source src to this reactor.
|
||||
* on success, invoke src.notify_reactor_add(this)
|
||||
*
|
||||
* returns true if source added; false if already present
|
||||
*/
|
||||
virtual bool add_source(ref::brw<ReactorSource> src) = 0;
|
||||
|
||||
/* remove source src from this reactor.
|
||||
* source must previously have been added by
|
||||
* .add_source(src).
|
||||
*
|
||||
* on success, invoke src.notify_reactor_remove(this)
|
||||
*
|
||||
* returns true if source removed; false if not present
|
||||
*/
|
||||
virtual bool remove_source(ref::brw<ReactorSource> src) = 0;
|
||||
|
||||
/* notification when non-primed source (source with no known events)
|
||||
* becomes primed (source with at least one event)
|
||||
*/
|
||||
virtual void notify_source_primed(ref::brw<ReactorSource> src) = 0;
|
||||
|
||||
/* dispatch one reactor event, borrowing the calling thread
|
||||
* amount of work this represents is Source/Sink specific.
|
||||
*
|
||||
* returns #of events dispatched (0 or 1)
|
||||
*/
|
||||
virtual std::uint64_t run_one() = 0;
|
||||
|
||||
/* borrow calling thread to dispatch reactor events.
|
||||
* if n is -1, run indefinitely
|
||||
* otherwise dispatch up to n events.
|
||||
* n = 0 is a noop
|
||||
*/
|
||||
std::uint64_t run_n(int32_t n);
|
||||
|
||||
/* borrow calling thread to run indefinitely.
|
||||
* suitable implementation for dedicated reactor threads
|
||||
*/
|
||||
void run() { this->run_n(-1); }
|
||||
|
||||
protected:
|
||||
Reactor();
|
||||
|
||||
private:
|
||||
/* control logging verbosity */
|
||||
log_level loglevel_;
|
||||
}; /*Reactor*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Reactor.hpp */
|
||||
130
xo-reactor/include/xo/reactor/ReactorSource.hpp
Normal file
130
xo-reactor/include/xo/reactor/ReactorSource.hpp
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
/* @file ReactorSource.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "AbstractSource.hpp"
|
||||
//#include "time/Time.hpp"
|
||||
#include <cstdint>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
class Reactor;
|
||||
|
||||
/* abstract api for a source of events.
|
||||
* Event representation is left open: Sources and Sinks
|
||||
* need to have compatible event representations,
|
||||
* and coordination is left to such (Source, Sink) pairs.
|
||||
*
|
||||
* Source->Sink activity may be expected to be mediated by a reactor,
|
||||
* that implements the Reactor api.
|
||||
*
|
||||
* At any time, A Source can be associated with at most one reactor.
|
||||
* Sources are informed of Reactor<->Source association being
|
||||
* formed/broken by the
|
||||
* .notify_reactor_add(), .notify_reactor_remove()
|
||||
* methods
|
||||
*
|
||||
* The source api intends also to provide for simulation.
|
||||
* There introduces two simulation-specific methods:
|
||||
* .sim_current_tm()
|
||||
* .sim_advance_until()
|
||||
*
|
||||
* A non-simulation source can implement these as calls to
|
||||
* .online_current_tm(), .online_advance_until() respectively
|
||||
* .online_current_tm() aborts since an online source is never exhausted
|
||||
* .online_advance_until() is a no-op that returns 0
|
||||
*
|
||||
* Loop for consuming from a primary simulation source:
|
||||
*
|
||||
* brw<Source> s = ...;
|
||||
* while(!s->is_exhausted())
|
||||
* s->deliver_one();
|
||||
*
|
||||
* Secondary sources (sources that depend on other sources) can be
|
||||
* in a state where they don't know their next event, in which case:
|
||||
*
|
||||
* s->is_notprimed() == true
|
||||
*/
|
||||
class ReactorSource : public AbstractSource {
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
virtual ~ReactorSource() = default;
|
||||
|
||||
/* true if source is currently empty (has 0 events to deliver) */
|
||||
virtual bool is_empty() const = 0;
|
||||
bool is_nonempty() const { return !this->is_empty(); }
|
||||
|
||||
/* true when source knows its next event
|
||||
* A source that isn't primed is also excluded from simulation
|
||||
* heap until it becomes primed.
|
||||
* This make feasible simulation sources that
|
||||
* depend on other simulation sources
|
||||
*/
|
||||
virtual bool is_primed() const { return !this->is_empty(); }
|
||||
virtual bool is_notprimed() const { return this->is_empty(); }
|
||||
|
||||
/* if true, this source has no events, and will never publish more events
|
||||
* - for sim, return true for a standalone source that has replayed all events
|
||||
* - for rt, set during orderly
|
||||
*/
|
||||
virtual bool is_exhausted() const = 0;
|
||||
|
||||
/* if this is a simulation source and .is_exhausted is false:
|
||||
* returns next event time; more precisely, no events exist prior to
|
||||
* this time.
|
||||
*
|
||||
* if sim, and .is_primed = true,
|
||||
* returns timestamp of next event
|
||||
*/
|
||||
virtual utc_nanos sim_current_tm() const = 0;
|
||||
|
||||
/* promise:
|
||||
* - .current_tm() > tm || .is_notprimed() || .is_exhausted() = true
|
||||
* - if replay_flag is true, then any events between previous .current_tm()
|
||||
* and new .current_tm() will have been published
|
||||
*
|
||||
* returns #of events delivered.
|
||||
* does not count events that were skipped, so always returns 0 if
|
||||
* replay_flag is false
|
||||
*/
|
||||
virtual std::uint64_t sim_advance_until(utc_nanos tm, bool replay_flag) = 0;
|
||||
|
||||
/* informs source when it's added to a reactor
|
||||
|
||||
* (see Reactor.add_source())
|
||||
*/
|
||||
virtual void notify_reactor_add(Reactor * /*reactor*/) {}
|
||||
|
||||
/* informs source when it's removed from a reactor
|
||||
* (see Reactor.remove_source())
|
||||
*/
|
||||
virtual void notify_reactor_remove(Reactor * /*reactor*/) {}
|
||||
|
||||
// ----- Inherited from AbstractSource -----
|
||||
|
||||
/* deliver one event to attached sink
|
||||
* interpretation of 'one event' is source-specific;
|
||||
* could be a collapsed or batched event in practice.
|
||||
*
|
||||
* no-op if source is empty.
|
||||
*
|
||||
* if sim, promise:
|
||||
* - new .current_tm >= old .current_tm() || .is_notprimed() || .is_exhausted()
|
||||
*
|
||||
* returns #of events delivered. Must be 0 or 1 in this context
|
||||
*/
|
||||
virtual std::uint64_t deliver_one() override = 0;
|
||||
|
||||
protected:
|
||||
/* default implementations for online sources */
|
||||
utc_nanos online_current_tm() const;
|
||||
uint64_t online_advance_until(utc_nanos tm, bool replay_flag);
|
||||
}; /*ReactorSource*/
|
||||
|
||||
using ReactorSourcePtr = rp<ReactorSource>;
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end ReactorSource.hpp */
|
||||
33
xo-reactor/include/xo/reactor/Reducer.hpp
Normal file
33
xo-reactor/include/xo/reactor/Reducer.hpp
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
/* @file Reducer.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "xo/reactor/EventTimeFn.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* LastReducer, HeapReducer inherit ReducerBase */
|
||||
template<typename Event, typename EventTimeFn>
|
||||
class ReducerBase {
|
||||
static_assert(EventTimeConcept<Event, EventTimeFn>);
|
||||
|
||||
public:
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
ReducerBase() = default;
|
||||
ReducerBase(EventTimeFn const & evtfn) : event_tm_fn_{evtfn} {}
|
||||
|
||||
utc_nanos event_tm(Event const & ev) const { return this->event_tm_fn_(ev); }
|
||||
|
||||
private:
|
||||
/* Event ev = ...;
|
||||
* .event_tm_fn(ev) -> utc_nanos
|
||||
* reports event time associated with ev
|
||||
*/
|
||||
EventTimeFn event_tm_fn_;
|
||||
}; /*ReducerBase*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Reducer.hpp */
|
||||
359
xo-reactor/include/xo/reactor/SecondarySource.hpp
Normal file
359
xo-reactor/include/xo/reactor/SecondarySource.hpp
Normal file
|
|
@ -0,0 +1,359 @@
|
|||
/* @file SecondarySource.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "EventSource.hpp"
|
||||
#include "Sink.hpp"
|
||||
#include "Reactor.hpp"
|
||||
#include "HeapReducer.hpp"
|
||||
#include "xo/callback/CallbackSet.hpp"
|
||||
#include "xo/cxxutil/demangle.hpp"
|
||||
#include <vector>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* A passive event source.
|
||||
* Can use as backend publisher when implementating another
|
||||
* event processor.
|
||||
*/
|
||||
template<typename Event, typename Reducer = HeapReducer<Event>>
|
||||
class SecondarySource : public EventSource<Sink1<Event>> {
|
||||
public:
|
||||
using EventSink = Sink1<Event>;
|
||||
template<typename Fn>
|
||||
using RpCallbackSet = fn::RpCallbackSet<Fn>;
|
||||
using CallbackId = fn::CallbackId;
|
||||
using TypeDescr = xo::reflect::TypeDescr;
|
||||
using utc_nanos = xo::time::utc_nanos;
|
||||
|
||||
public:
|
||||
~SecondarySource() = default;
|
||||
|
||||
static rp<SecondarySource> make() { return new SecondarySource(); }
|
||||
|
||||
/* last event delivered from this source --
|
||||
* i.e. event in most recent call to .deliver_one_aux()
|
||||
*/
|
||||
Event const & last_annexed_ev() const { return this->reducer_.last_annexed_ev(); }
|
||||
|
||||
void notify_upstream_exhausted() { this->upstream_exhausted_ = true; }
|
||||
|
||||
/* make event available to reactor, by adding to internal reducer */
|
||||
void notify_secondary_event(Event const & ev) {
|
||||
/* test if ev is priming, update .current_tm */
|
||||
bool is_priming = this->preprocess_secondary_event(ev);
|
||||
|
||||
this->reducer_.include_event(ev);
|
||||
|
||||
this->postprocess_secondary_event(is_priming);
|
||||
} /*notify_secondary_event*/
|
||||
|
||||
void notify_secondary_event(Event && ev) {
|
||||
bool is_priming = this->preprocess_secondary_event(ev);
|
||||
|
||||
this->reducer_.include_event(ev);
|
||||
|
||||
this->postprocess_secondary_event(is_priming);
|
||||
} /*notify_secondary_event*/
|
||||
|
||||
template<typename T>
|
||||
void notify_secondary_event_v(T const & v) {
|
||||
using xo::scope;
|
||||
using xo::xtag;
|
||||
|
||||
if (v.empty())
|
||||
return;
|
||||
|
||||
scope log(XO_DEBUG(this->debug_sim_flag_));
|
||||
|
||||
log && log(xtag("name", this->name()));
|
||||
|
||||
if (this->upstream_exhausted_) {
|
||||
throw std::runtime_error("SecondarySource::notify_secondary_event_v"
|
||||
": not allowed after upstream exhausted");
|
||||
}
|
||||
|
||||
uint32_t n_ev = 0;
|
||||
|
||||
for (Event const & ev : v) {
|
||||
utc_nanos evtm = this->reducer_.event_tm(ev);
|
||||
|
||||
if (this->current_tm_ < evtm)
|
||||
this->current_tm_ = evtm;
|
||||
|
||||
++n_ev;
|
||||
}
|
||||
|
||||
log && log(xtag("T", reflect::type_name<T>()),
|
||||
xtag("n_ev", n_ev));
|
||||
|
||||
if (n_ev > 0) {
|
||||
/* if reducer is empty when .notify_secondary_event_v() begins,
|
||||
* then reactor/simulator needs to be notified that source is no longer empty
|
||||
*/
|
||||
bool is_priming = this->reducer_.is_empty();
|
||||
|
||||
for (Event const & ev : v)
|
||||
this->reducer_.include_event(ev);
|
||||
|
||||
Reactor * reactor = this->parent_reactor_;
|
||||
|
||||
if (reactor) {
|
||||
if (is_priming) {
|
||||
/* reactor/simulator takes responsibility for delivering events */
|
||||
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
|
||||
}
|
||||
} else {
|
||||
/* special case if no reactor: deliver immediately */
|
||||
|
||||
//this->deliver_one();
|
||||
this->deliver_all();
|
||||
}
|
||||
}
|
||||
} /*notify_secondary_event_v*/
|
||||
|
||||
// ----- inherited from EventSource -----
|
||||
|
||||
virtual CallbackId add_callback(rp<EventSink> const & cb) override {
|
||||
return this->cb_set_.add_callback(cb);
|
||||
} /*add_callback*/
|
||||
|
||||
virtual void remove_callback(CallbackId id) override {
|
||||
this->cb_set_.remove_callback(id);
|
||||
} /*remove_callback*/
|
||||
|
||||
// ----- inherited from ReactorSource -----
|
||||
|
||||
virtual bool is_empty() const override { return this->reducer_.is_empty(); }
|
||||
virtual bool is_exhausted() const override { return this->upstream_exhausted_ && this->is_empty(); }
|
||||
|
||||
virtual utc_nanos sim_current_tm() const override {
|
||||
using xo::scope;
|
||||
using xo::xtag;
|
||||
|
||||
if (this->reducer_.is_empty()) {
|
||||
/* this is a tricky case.
|
||||
* it means this source doesn't
|
||||
* _know_ specific next event yet; however new events
|
||||
* may appear at any time by way of .notify_event()
|
||||
*
|
||||
* If event doesn't know next event, then .current_tm isn't useful
|
||||
* for establishing priority relative to other sources.
|
||||
* rely on priming mechanism instead,
|
||||
* which means that control should never come here.
|
||||
*/
|
||||
return this->current_tm_;
|
||||
} else {
|
||||
scope log(XO_DEBUG(false /*this->debug_sim_flag_*/),
|
||||
xtag("name", this->name_),
|
||||
xtag("next_tm", this->reducer_.next_tm()));
|
||||
|
||||
return this->reducer_.next_tm();
|
||||
}
|
||||
} /*sim_current_tm*/
|
||||
|
||||
virtual std::uint64_t deliver_one() override {
|
||||
return this->deliver_one_aux(true /*replay_flag*/);
|
||||
}
|
||||
|
||||
virtual std::uint64_t sim_advance_until(utc_nanos target_tm,
|
||||
bool replay_flag) override
|
||||
{
|
||||
uint64_t retval = 0;
|
||||
|
||||
while (!this->reducer_.is_empty()) {
|
||||
utc_nanos tm = this->sim_current_tm();
|
||||
|
||||
if (tm < target_tm) {
|
||||
retval += this->deliver_one_aux(replay_flag);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*sim_advance_until*/
|
||||
|
||||
virtual void notify_reactor_add(Reactor * reactor) override {
|
||||
assert(!this->parent_reactor_);
|
||||
|
||||
this->parent_reactor_ = reactor;
|
||||
} /*notify_reactor_add*/
|
||||
|
||||
virtual void notify_reactor_remove(Reactor * /*reactor*/) override {}
|
||||
|
||||
// ----- inherited from AbstractSource -----
|
||||
|
||||
virtual TypeDescr source_ev_type() const override {
|
||||
return reflect::Reflect::require<Event>();
|
||||
} /*source_ev_type*/
|
||||
|
||||
virtual uint32_t n_out_ev() const override { return n_out_ev_; }
|
||||
/* #of events queued for delivery */
|
||||
virtual uint32_t n_queued_out_ev() const override { return this->reducer_.n_event(); }
|
||||
|
||||
virtual bool debug_sim_flag() const override { return debug_sim_flag_; }
|
||||
virtual void set_debug_sim_flag(bool x) override { this->debug_sim_flag_ = x; }
|
||||
|
||||
virtual CallbackId attach_sink(rp<AbstractSink> const & sink) override {
|
||||
rp<EventSink> native_sink
|
||||
= EventSink::require_native("SecondarySource::attach_sink", sink);
|
||||
|
||||
if (native_sink) {
|
||||
if (!this->is_volatile()
|
||||
|| native_sink->allow_volatile_source())
|
||||
{
|
||||
return this->add_callback(native_sink);
|
||||
} else {
|
||||
throw std::runtime_error("SecondarySource::attach_sink"
|
||||
": sink requires non-volatile source "
|
||||
+ std::string(reflect::type_name<Event>()));
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("SecondarySource::attach_sink"
|
||||
": expected sink accepting "
|
||||
+ std::string(reflect::type_name<Event>()));
|
||||
}
|
||||
} /*attach_sink*/
|
||||
|
||||
virtual void detach_sink(CallbackId id) override {
|
||||
this->remove_callback(id);
|
||||
} /*detach_sink*/
|
||||
|
||||
// ----- Inherited from AbstractEventProcessor -----
|
||||
|
||||
virtual std::string const & name() const override { return name_; }
|
||||
virtual void set_name(std::string const & x) override { this->name_ = x; }
|
||||
|
||||
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor> ep)> const & fn) override {
|
||||
|
||||
for(auto x : this->cb_set_)
|
||||
fn(x.fn_.borrow());
|
||||
} /*visit_direct_consumers*/
|
||||
|
||||
private:
|
||||
/* event book-keeping on receiving an event.
|
||||
*/
|
||||
bool preprocess_secondary_event(Event const & ev)
|
||||
{
|
||||
if (this->upstream_exhausted_) {
|
||||
throw std::runtime_error("SecondarySource::notify_secondary_event"
|
||||
": not allowed after upstream exhausted");
|
||||
}
|
||||
|
||||
utc_nanos evtm = this->reducer_.event_tm(ev);
|
||||
|
||||
if (this->current_tm_ < evtm)
|
||||
this->current_tm_ = evtm;
|
||||
|
||||
/* if reducer is empty when .notify_event() begins,
|
||||
* then reactor/simulator needs to be notified that source is no longer empty
|
||||
*/
|
||||
bool is_priming = this->reducer_.is_empty();
|
||||
|
||||
return is_priming;
|
||||
} /*preprocess_secondary_event*/
|
||||
|
||||
/* event bookkeeping after receiving an event.
|
||||
*
|
||||
* Require: event has been propagated to .reducer
|
||||
*
|
||||
* is_priming. true if event causes source to
|
||||
* become non-empty --> must notify reactor
|
||||
*/
|
||||
void postprocess_secondary_event(bool is_priming) {
|
||||
using xo::scope;
|
||||
using xo::xtag;
|
||||
|
||||
Reactor * reactor = this->parent_reactor_;
|
||||
|
||||
scope log(XO_DEBUG(this->debug_sim_flag_),
|
||||
xtag("name", name_),
|
||||
xtag("reactor", (void*)reactor),
|
||||
xtag("is_priming", is_priming));
|
||||
|
||||
if (reactor) {
|
||||
if (is_priming) {
|
||||
/* reactor/simulator takes responsibility for delivering events */
|
||||
reactor->notify_source_primed(ref::brw<ReactorSource>::from_native(this));
|
||||
}
|
||||
} else {
|
||||
/* if no reactor, deliver immediately */
|
||||
this->deliver_one();
|
||||
}
|
||||
} /*postprocess_secondary_event*/
|
||||
|
||||
/* deliver one event from reducer;
|
||||
* invoke callback whenever replay_flag is true
|
||||
*/
|
||||
std::uint64_t deliver_one_aux(bool replay_flag) {
|
||||
scope log(XO_DEBUG(this->debug_sim_flag_),
|
||||
xtag("name", this->name_),
|
||||
xtag("reducer.empty", this->reducer_.is_empty()),
|
||||
xtag("replay_flag", replay_flag));
|
||||
|
||||
if (this->reducer_.is_empty())
|
||||
return 0;
|
||||
|
||||
/* need to remove event _before_ invoking callbacks;
|
||||
* callbacks may indirectly call this->notify_secondary_event(),
|
||||
* modifiying .reducer
|
||||
*
|
||||
* reducer may use double-buffering scheme or similar to
|
||||
* mitigate copying, esp when Event objects are heavy
|
||||
*/
|
||||
Event & ev = this->reducer_.annex_one();
|
||||
|
||||
/* if SecondarySource:
|
||||
* Event ev = this->event_heap_.front();
|
||||
* std::pop_heap(this->event_heap_.begin(),
|
||||
* this->event_heap_.end(),
|
||||
* std::greater<Event>());
|
||||
* this->event_heap_.pop_back();
|
||||
*/
|
||||
|
||||
if (replay_flag) {
|
||||
++(this->n_out_ev_);
|
||||
this->cb_set_.invoke(&EventSink::notify_ev, ev);
|
||||
}
|
||||
|
||||
return 1;
|
||||
} /*deliver_one_aux*/
|
||||
|
||||
private:
|
||||
/* current time for this source */
|
||||
utc_nanos current_tm_;
|
||||
|
||||
/* reporting name for this source (use when .debug_sim_flag set)
|
||||
*/
|
||||
std::string name_;
|
||||
|
||||
/* if true, reactor/simulator to log interaction with this source
|
||||
*/
|
||||
bool debug_sim_flag_ = false;
|
||||
|
||||
/* count lifetime #of outgoing events */
|
||||
uint32_t n_out_ev_ = 0;
|
||||
|
||||
/* set this to true, once, to announce that upstream will send
|
||||
* no more events. see .notify_upstream_exhausted()
|
||||
*/
|
||||
bool upstream_exhausted_ = false;
|
||||
|
||||
/* events to be delivered to callbacks.
|
||||
* multiple events may be collapsed depending on Reducer implementation
|
||||
*/
|
||||
Reducer reducer_;
|
||||
|
||||
/* reactor/simulator being used to schedule consumption. if ommitted,
|
||||
* will borrow thread calling .notify_secondary_event()
|
||||
*/
|
||||
Reactor * parent_reactor_ = nullptr;
|
||||
|
||||
/* invoke callbacks in this set to send an outgoing event */
|
||||
RpCallbackSet<EventSink> cb_set_;
|
||||
}; /*SecondarySource*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end SecondarySource.hpp */
|
||||
222
xo-reactor/include/xo/reactor/Sink.hpp
Normal file
222
xo-reactor/include/xo/reactor/Sink.hpp
Normal file
|
|
@ -0,0 +1,222 @@
|
|||
/* @file Sink.hpp */
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "AbstractSink.hpp"
|
||||
#include "AbstractSource.hpp"
|
||||
#include "PolyAdapterSink.hpp"
|
||||
#include "xo/reflect/Reflect.hpp"
|
||||
#include "xo/indentlog/print/time.hpp"
|
||||
#include "xo/indentlog/print/tag.hpp"
|
||||
#include "xo/cxxutil/demangle.hpp"
|
||||
#include <typeinfo>
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
/* Sink for events of type T
|
||||
*
|
||||
* inheritance:
|
||||
* ref::Refcount
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::AbstractEventProcessor
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::AbstractSink
|
||||
* ^
|
||||
* isa
|
||||
* |
|
||||
* reactor::Sink1<T>
|
||||
*/
|
||||
template<typename T>
|
||||
class Sink1 : public AbstractSink {
|
||||
public:
|
||||
using Reflect = reflect::Reflect;
|
||||
using TypeDescr = reflect::TypeDescr;
|
||||
|
||||
public:
|
||||
/* convenience: convert abstract sink to Sink1<T>*,
|
||||
* or throw
|
||||
*/
|
||||
static rp<Sink1<T>> require_native(std::string_view caller,
|
||||
rp<AbstractSink> const & sink)
|
||||
{
|
||||
using xo::scope;
|
||||
using xo::xtag;
|
||||
|
||||
/* 1. if sink expects events of type T,
|
||||
* make direct connection
|
||||
*/
|
||||
Sink1<T> * native_sink = nullptr;
|
||||
|
||||
native_sink = dynamic_cast<Sink1<T> *>(sink.get());
|
||||
|
||||
if (native_sink)
|
||||
return native_sink;
|
||||
|
||||
/* 2. if sink is polymorphic,
|
||||
* make type-erasing adapter
|
||||
*/
|
||||
|
||||
if (sink->allow_polymorphic_source()) {
|
||||
#ifdef DEBUG_NOT_USING
|
||||
scope lscope("Sink1<T>::require_native: create PolyAdapterSink");
|
||||
lscope.log(xtag("caller", caller));
|
||||
#endif
|
||||
|
||||
return PolyAdapterSink<T>::make(sink);
|
||||
}
|
||||
|
||||
if (!native_sink) {
|
||||
#ifdef DEBUG_EVENT_TYPEINFO
|
||||
std::type_info const * sink_parent_typeinfo
|
||||
= sink->parent_typeinfo();
|
||||
#endif
|
||||
|
||||
std::size_t src_hashcode = typeid(T).hash_code();
|
||||
|
||||
throw std::runtime_error(tostr("Sink1<T>::require_native"
|
||||
": wanted to sink S, but sink expects T",
|
||||
xtag("caller", caller),
|
||||
xtag("T", sink->sink_ev_type()->canonical_name()),
|
||||
xtag("S", reflect::type_name<T>()),
|
||||
xtag("required_hashcode", typeid(Sink1<T>).hash_code()),
|
||||
xtag("required_name", typeid(Sink1<T>).name()),
|
||||
xtag("src_hashcode", src_hashcode),
|
||||
xtag("sink_hashcode", sink->sink_ev_type()->native_typeinfo()->hash_code())
|
||||
#ifdef DEBUG_EVENT_TYPEINFO
|
||||
, xtag("sink_hashcode", sink->item_typeinfo()->hash_code())
|
||||
, xtag("sink_parent_hashcode", sink_parent_typeinfo->hash_code())
|
||||
, xtag("sink_parent_name", sink_parent_typeinfo->name())
|
||||
, xtag("sink.type", sink->self_typename())
|
||||
, xtag("sink.parent_type", sink->parent_typename())
|
||||
#endif
|
||||
));
|
||||
}
|
||||
|
||||
return native_sink;
|
||||
} /*require_native*/
|
||||
|
||||
virtual TypeDescr sink_ev_type() const override { return reflect::Reflect::require<T>(); }
|
||||
/* accept incoming event */
|
||||
virtual void notify_ev(T const & ev) = 0;
|
||||
|
||||
/* invoke these when this sink added to, or removed from, a source */
|
||||
virtual void notify_add_callback() {}
|
||||
virtual void notify_remove_callback() {}
|
||||
|
||||
// ----- inherited from AbstractSink -----
|
||||
|
||||
/* Sink1<T> only allows source providing T */
|
||||
virtual bool allow_polymorphic_source() const override { return false; }
|
||||
|
||||
virtual void attach_source(rp<AbstractSource> const & src) override {
|
||||
src->attach_sink(this);
|
||||
} /*attach_source*/
|
||||
|
||||
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override {
|
||||
using xo::xtag;
|
||||
|
||||
T * p_ev = ev_tp.recover_native<T>();
|
||||
|
||||
if (p_ev) {
|
||||
this->notify_ev(*p_ev);
|
||||
} else {
|
||||
throw std::runtime_error(tostr("Sink1<T>::notify_ev_tp"
|
||||
": unable to convert ev_tp to T",
|
||||
xtag("ev_tp.type", ev_tp.td()->canonical_name()),
|
||||
xtag("T", reflect::type_name<T>())));
|
||||
}
|
||||
} /*notify_ev_tp*/
|
||||
}; /*Sink1*/
|
||||
|
||||
/* a sink with no further downstream processors */
|
||||
template<typename T>
|
||||
class SinkEndpoint : public Sink1<T> {
|
||||
public:
|
||||
// ----- Inherited from AbstractEventProcessor -----
|
||||
|
||||
virtual std::string const & name() const override { return name_; }
|
||||
virtual void set_name(std::string const & x) override { name_ = x; }
|
||||
|
||||
virtual void visit_direct_consumers(std::function<void (ref::brw<AbstractEventProcessor>)> const &) override {
|
||||
/* *this is not an event source */
|
||||
} /*visit_direct_consumers*/
|
||||
|
||||
private:
|
||||
/* reporting name for this sink */
|
||||
std::string name_;
|
||||
}; /*SinkEndpoint*/
|
||||
|
||||
template<typename T, typename Fn>
|
||||
class SinkToFunction : public SinkEndpoint<T> {
|
||||
public:
|
||||
SinkToFunction(Fn fn) : fn_{std::move(fn)} {}
|
||||
|
||||
/* NOTE: conservative choice here, could templatize on this */
|
||||
virtual bool allow_volatile_source() const override { return false; }
|
||||
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
|
||||
virtual void notify_ev(T const & ev) override {
|
||||
++(this->n_in_ev_);
|
||||
fn_(ev);
|
||||
} /*notify_ev*/
|
||||
|
||||
virtual void display(std::ostream & os) const override {
|
||||
using xo::xtag;
|
||||
|
||||
os << "<SinkToFunction"
|
||||
<< xtag("name", this->name())
|
||||
<< xtag("n_in_ev", this->n_in_ev())
|
||||
<< ">";
|
||||
} /*display*/
|
||||
|
||||
private:
|
||||
Fn fn_;
|
||||
/* counts lifetime #of incoming events (see .notify_ev()) */
|
||||
uint32_t n_in_ev_ = 0;
|
||||
}; /*SinkToFunction*/
|
||||
|
||||
/* sink that prints to console */
|
||||
template<typename T>
|
||||
class SinkToConsole : public SinkEndpoint<T> {
|
||||
public:
|
||||
SinkToConsole() {}
|
||||
|
||||
virtual bool allow_volatile_source() const override { return true; }
|
||||
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
|
||||
virtual void notify_ev(T const & ev) override {
|
||||
//using logutil::operator<<;
|
||||
|
||||
++(this->n_in_ev_);
|
||||
|
||||
std::cout << ev << std::endl;
|
||||
} /*notify_ev*/
|
||||
|
||||
virtual void display(std::ostream & os) const override {
|
||||
using xo::xtag;
|
||||
|
||||
os << "<SinkToConsole"
|
||||
<< xtag("name", this->name())
|
||||
<< xtag("n_in_ev", this->n_in_ev())
|
||||
<< ">";
|
||||
} /*display*/
|
||||
|
||||
private:
|
||||
/* reporting name for this sink */
|
||||
std::string name_;
|
||||
/* counts lifetime #of incoming events (see .notify_ev()) */
|
||||
uint32_t n_in_ev_ = 0;
|
||||
}; /*SinkToConsole*/
|
||||
|
||||
#ifdef NOT_USING
|
||||
class TemporaryTest {
|
||||
public:
|
||||
static rp<SinkToConsole<std::pair<xo::time::utc_nanos, double>>> realization_printer();
|
||||
}; /*TemporaryTest*/
|
||||
#endif
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Sink.hpp */
|
||||
20
xo-reactor/include/xo/reactor/init_reactor.hpp
Normal file
20
xo-reactor/include/xo/reactor/init_reactor.hpp
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
/* file init_reactor.hpp
|
||||
*
|
||||
* author: Roland Conybeare, Aug 2022
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "xo/subsys/Subsystem.hpp"
|
||||
|
||||
namespace xo {
|
||||
enum S_reactor_tag {};
|
||||
|
||||
template<>
|
||||
struct InitSubsys<S_reactor_tag> {
|
||||
static void init();
|
||||
static InitEvidence require();
|
||||
};
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end init_reactor.hpp */
|
||||
92
xo-reactor/src/reactor/AbstractEventProcessor.cpp
Normal file
92
xo-reactor/src/reactor/AbstractEventProcessor.cpp
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
/* @file AbstractEventProcessor.cp */
|
||||
|
||||
#include "AbstractEventProcessor.hpp"
|
||||
#include "xo/indentlog/print/tostr.hpp"
|
||||
#include <unordered_map>
|
||||
#include <map>
|
||||
|
||||
namespace xo {
|
||||
using ref::brw;
|
||||
using xo::tostr;
|
||||
using std::uint32_t;
|
||||
|
||||
namespace reactor {
|
||||
namespace {
|
||||
/* search all event processors ep reachable (dowstream) from x,
|
||||
* add to *m;
|
||||
*/
|
||||
void
|
||||
map_network_helper(brw<AbstractEventProcessor> x,
|
||||
uint32_t * tsort_ix,
|
||||
std::unordered_map<AbstractEventProcessor*, uint32_t> * m)
|
||||
{
|
||||
if (m->contains(x.get()))
|
||||
return;
|
||||
|
||||
auto fn = [tsort_ix, m]
|
||||
(brw<AbstractEventProcessor> ep)
|
||||
{
|
||||
map_network_helper(ep, tsort_ix, m);
|
||||
};
|
||||
|
||||
x->visit_direct_consumers(fn);
|
||||
|
||||
/* postorder! */
|
||||
(*m)[x.get()] = ++(*tsort_ix);
|
||||
|
||||
} /*map_network_helper*/
|
||||
} /*namespace*/
|
||||
|
||||
std::vector<rp<AbstractEventProcessor>>
|
||||
AbstractEventProcessor::map_network(rp<AbstractEventProcessor> const & x)
|
||||
{
|
||||
std::unordered_map<AbstractEventProcessor *, std::uint32_t> network_map;
|
||||
|
||||
/* index event processors in reverse topological order:
|
||||
* if B is (directly or indirectly) downstream from A,
|
||||
* then tsort_ix(B) < tsort_ix(A)
|
||||
*/
|
||||
uint32_t tsort_ix = 0;
|
||||
|
||||
/* depth-first traversal, detect and short-circuit on dup paths */
|
||||
map_network_helper(x.borrow(), &tsort_ix, &network_map);
|
||||
|
||||
/* invariant: tsort_ix = #of event processors in network */
|
||||
uint32_t n = tsort_ix;
|
||||
|
||||
/* network_map, now in a topologically sorted order */
|
||||
std::map<uint32_t, AbstractEventProcessor *> tsorted_map;
|
||||
{
|
||||
for(auto const & x : network_map) {
|
||||
uint32_t tsort_ix = x.second;
|
||||
AbstractEventProcessor * ep = x.first;
|
||||
|
||||
tsorted_map[n - tsort_ix] = ep;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<rp<AbstractEventProcessor>> retval;
|
||||
{
|
||||
for(auto const & x : tsorted_map)
|
||||
retval.push_back(x.second);
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*map_network*/
|
||||
|
||||
void
|
||||
AbstractEventProcessor::display(std::ostream & os) const
|
||||
{
|
||||
os << "<AbstractEventProcessor" << xtag("name", name()) << ">";
|
||||
} /*display*/
|
||||
|
||||
std::string
|
||||
AbstractEventProcessor::display_string() const
|
||||
{
|
||||
return tostr(*this);
|
||||
} /*display_string*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end AbstractEventProcessor.cpp */
|
||||
81
xo-reactor/src/reactor/AbstractSource.cpp
Normal file
81
xo-reactor/src/reactor/AbstractSource.cpp
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
/* @file AbstractSource.cpp */
|
||||
|
||||
#include "AbstractSource.hpp"
|
||||
#include "xo/indentlog/scope.hpp"
|
||||
#include "xo/webutil/StreamEndpointDescr.hpp"
|
||||
//#include "indentlog/scope.hpp"
|
||||
|
||||
namespace xo {
|
||||
using xo::web::StreamEndpointDescr;
|
||||
using xo::reactor::AbstractSink;
|
||||
|
||||
namespace reactor {
|
||||
StreamEndpointDescr
|
||||
AbstractSource::stream_endpoint_descr(std::string const & url_prefix)
|
||||
{
|
||||
auto subscribe_fn
|
||||
= ([this]
|
||||
(rp<AbstractSink> const & ws_sink)
|
||||
{
|
||||
//scope lscope("AbstractSource::stream_endpoint_descr.subscribe_fn");
|
||||
|
||||
/* ws_sink created by websocket, sends events to websocket as json
|
||||
* see [websock/WebsocketSink]
|
||||
*/
|
||||
return this->attach_sink(ws_sink);
|
||||
});
|
||||
|
||||
auto unsubscribe_fn
|
||||
= ([this]
|
||||
(CallbackId id)
|
||||
{
|
||||
this->detach_sink(id);
|
||||
});
|
||||
|
||||
return StreamEndpointDescr(url_prefix,
|
||||
subscribe_fn,
|
||||
unsubscribe_fn);
|
||||
} /*stream_endpoint_descr*/
|
||||
|
||||
uint64_t
|
||||
AbstractSource::deliver_n(uint64_t n)
|
||||
{
|
||||
uint64_t retval = 0;
|
||||
|
||||
for (uint64_t i=0; i<n; ++i) {
|
||||
uint64_t n1 = this->deliver_one();
|
||||
|
||||
if (n1 == 0) {
|
||||
/* short-circuit if source has less than n
|
||||
* events available
|
||||
*/
|
||||
break;
|
||||
}
|
||||
|
||||
retval += n1;
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*deliver_n*/
|
||||
|
||||
uint64_t
|
||||
AbstractSource::deliver_all()
|
||||
{
|
||||
uint64_t retval = 0;
|
||||
|
||||
for (;;) {
|
||||
uint64_t n1 = this->deliver_one();
|
||||
|
||||
if (n1 == 0)
|
||||
break;
|
||||
|
||||
retval += n1;
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*deliver_all*/
|
||||
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end AbstractSource.cpp */
|
||||
24
xo-reactor/src/reactor/CMakeLists.txt
Normal file
24
xo-reactor/src/reactor/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
# xo-reactor/src/reactor/CMakeLists.txt
|
||||
|
||||
set(SELF_LIB reactor)
|
||||
set(SELF_SRCS
|
||||
AbstractEventProcessor.cpp AbstractSource.cpp ReactorSource.cpp
|
||||
Sink.cpp
|
||||
Reactor.cpp PollingReactor.cpp
|
||||
init_reactor.cpp)
|
||||
|
||||
xo_add_shared_library4(${SELF_LIB} ${PROJECT_NAME}Targets ${PROJECT_VERSION} 1 ${SELF_SRCS})
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# external dependencies
|
||||
|
||||
# note: changes to xo_dependency() calls here
|
||||
# must coordinate with find_dependency() calls
|
||||
# in xo-reactor/cmake/reactorConfig.cmake.in
|
||||
#
|
||||
#xo_dependency(${SELF_LIB} xo_ordinaltree) # for some reason wants to link -lxo_ordinaltree ??
|
||||
xo_dependency(${SELF_LIB} reflect)
|
||||
xo_dependency(${SELF_LIB} webutil)
|
||||
xo_dependency(${SELF_LIB} printjson)
|
||||
xo_dependency(${SELF_LIB} callback)
|
||||
xo_dependency(${SELF_LIB} xo_ordinaltree)
|
||||
105
xo-reactor/src/reactor/PollingReactor.cpp
Normal file
105
xo-reactor/src/reactor/PollingReactor.cpp
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
/* @file PollingReactor.cpp */
|
||||
|
||||
#include "PollingReactor.hpp"
|
||||
|
||||
namespace xo {
|
||||
using ref::brw;
|
||||
using std::size_t;
|
||||
using std::uint64_t;
|
||||
using std::int64_t;
|
||||
|
||||
namespace reactor {
|
||||
bool
|
||||
PollingReactor::add_source(brw<ReactorSource> src)
|
||||
{
|
||||
/* make sure src does not already appear in .source_v[] */
|
||||
for(ReactorSourcePtr const & x : this->source_v_) {
|
||||
if(x.get() == src.get()) {
|
||||
throw std::runtime_error("PollingReactor::add_source; source already present");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
src->notify_reactor_add(this);
|
||||
|
||||
this->source_v_.push_back(src.get());
|
||||
|
||||
return true;
|
||||
} /*add_source*/
|
||||
|
||||
bool
|
||||
PollingReactor::remove_source(brw<ReactorSource> src)
|
||||
{
|
||||
auto ix = std::find(this->source_v_.begin(),
|
||||
this->source_v_.end(),
|
||||
src);
|
||||
|
||||
if(ix != this->source_v_.end()) {
|
||||
src->notify_reactor_remove(this);
|
||||
|
||||
this->source_v_.erase(ix);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
} /*remove_source*/
|
||||
|
||||
void
|
||||
PollingReactor::notify_source_primed(brw<ReactorSource>) {
|
||||
/* nothing to do here -- all sources always checked by polling loop */
|
||||
} /*notify_source_primed*/
|
||||
|
||||
int64_t
|
||||
PollingReactor::find_nonempty_source(size_t start_ix)
|
||||
{
|
||||
size_t z = this->source_v_.size();
|
||||
|
||||
/* search sources [ix .. z) */
|
||||
for(size_t ix = start_ix; ix < z; ++ix) {
|
||||
brw<ReactorSource> src = this->source_v_[ix];
|
||||
|
||||
if(src->is_nonempty())
|
||||
return ix;
|
||||
}
|
||||
|
||||
/* search source [0 .. ix) */
|
||||
for(size_t ix = 0, n = std::min(start_ix, z); ix < n; ++ix) {
|
||||
brw<ReactorSource> src = this->source_v_[ix];
|
||||
|
||||
if(src->is_nonempty())
|
||||
return ix;
|
||||
}
|
||||
|
||||
return -1;
|
||||
} /*find_nonempty_source*/
|
||||
|
||||
uint64_t
|
||||
PollingReactor::run_one()
|
||||
{
|
||||
int64_t ix = this->find_nonempty_source(this->next_ix_);
|
||||
|
||||
scope log(XO_DEBUG(this->loglevel() <= log_level::chatty));
|
||||
|
||||
log && log(xtag("self", this), xtag("src_ix", ix));
|
||||
|
||||
uint64_t retval = 0;
|
||||
|
||||
if(ix >= 0) {
|
||||
brw<ReactorSource> src = this->source_v_[ix];
|
||||
|
||||
log && log(xtag("src.name", src->name()));
|
||||
|
||||
retval = src->deliver_one();
|
||||
} else {
|
||||
retval = 0;
|
||||
}
|
||||
|
||||
log.end_scope(xtag("retval", retval));
|
||||
|
||||
return retval;
|
||||
} /*run_one*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end PollingReactor.cpp */
|
||||
40
xo-reactor/src/reactor/Reactor.cpp
Normal file
40
xo-reactor/src/reactor/Reactor.cpp
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
/* file Reactor.cpp
|
||||
*
|
||||
* author: Roland Conybeare, Sep 2022
|
||||
*/
|
||||
|
||||
#include "Reactor.hpp"
|
||||
#include "init_reactor.hpp"
|
||||
#include "xo/subsys/Subsystem.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
Reactor::Reactor() {
|
||||
/* ensure reactor subsystem + deps initialized */
|
||||
|
||||
InitSubsys<S_reactor_tag>::require();
|
||||
|
||||
Subsystem::initialize_all();
|
||||
}
|
||||
|
||||
std::uint64_t
|
||||
Reactor::run_n(int32_t n)
|
||||
{
|
||||
std::uint64_t retval = 0;
|
||||
|
||||
if (n == -1) {
|
||||
for (;;) {
|
||||
retval += this->run_one();
|
||||
}
|
||||
} else {
|
||||
for (int32_t i=0; i<n; ++i) {
|
||||
retval += this->run_one();
|
||||
}
|
||||
}
|
||||
|
||||
return retval;
|
||||
} /*run_n*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Reactor.cpp */
|
||||
34
xo-reactor/src/reactor/ReactorSource.cpp
Normal file
34
xo-reactor/src/reactor/ReactorSource.cpp
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
/* @file ReactorSource.cpp */
|
||||
|
||||
#include "ReactorSource.hpp"
|
||||
#include "xo/indentlog/print/time.hpp"
|
||||
#include <cstdint>
|
||||
|
||||
namespace xo {
|
||||
using xo::time::utc_nanos;
|
||||
|
||||
namespace reactor {
|
||||
utc_nanos
|
||||
ReactorSource::online_current_tm() const
|
||||
{
|
||||
/* for an online source:
|
||||
* .is_exhausted() must always be false;
|
||||
* this implies that .sim_current_tm() should
|
||||
* not be called in the first place
|
||||
*/
|
||||
|
||||
assert(false);
|
||||
|
||||
return time::timeutil::epoch();
|
||||
} /*online_current_tm*/
|
||||
|
||||
std::uint64_t
|
||||
ReactorSource::online_advance_until(utc_nanos /*tm*/,
|
||||
bool /*replay_flag*/)
|
||||
{
|
||||
return 0;
|
||||
} /*online_advance_until*/
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Source.cpp */
|
||||
18
xo-reactor/src/reactor/Sink.cpp
Normal file
18
xo-reactor/src/reactor/Sink.cpp
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
/* @file Sink.cpp */
|
||||
|
||||
#include "Sink.hpp"
|
||||
#include "xo/refcnt/Refcounted.hpp"
|
||||
|
||||
namespace xo {
|
||||
namespace reactor {
|
||||
#ifdef NOT_USING
|
||||
rp<SinkToConsole<std::pair<xo::time::utc_nanos, double>>>
|
||||
TemporaryTest::realization_printer()
|
||||
{
|
||||
return new SinkToConsole<std::pair<xo::time::utc_nanos, double>>();
|
||||
} /*realization_printer*/
|
||||
#endif
|
||||
} /*namespace reactor*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Sink.cpp */
|
||||
31
xo-reactor/src/reactor/init_reactor.cpp
Normal file
31
xo-reactor/src/reactor/init_reactor.cpp
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
/* file init_reactor.cpp
|
||||
*
|
||||
* author: Roland Conybeare, Aug 2022
|
||||
*/
|
||||
|
||||
#include "init_reactor.hpp"
|
||||
#include "xo/reflect/init_reflect.hpp"
|
||||
|
||||
namespace xo {
|
||||
void
|
||||
InitSubsys<S_reactor_tag>::init()
|
||||
{
|
||||
/* TODO: reflect reactor types */
|
||||
} /*init*/
|
||||
|
||||
InitEvidence
|
||||
InitSubsys<S_reactor_tag>::require()
|
||||
{
|
||||
InitEvidence retval;
|
||||
|
||||
/* subsystem dependencies for reactor/ */
|
||||
retval ^= InitSubsys<S_reflect_tag>::require();
|
||||
|
||||
/* reactor/'s own initialization code */
|
||||
retval ^= Subsystem::provide<S_reactor_tag>("reactor", &init);
|
||||
|
||||
return retval;
|
||||
} /*require*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end init_reactor.cpp */
|
||||
11
xo-reactor/utest/CMakeLists.txt
Normal file
11
xo-reactor/utest/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
# build unittest reactor/unittest'
|
||||
|
||||
set(SELF_EXE utest.reactor)
|
||||
set(SELF_SRCS Sink.test.cpp PollingReactor.test.cpp reactor_utest_main.cpp)
|
||||
|
||||
xo_add_utest_executable(${SELF_EXE} ${SELF_SRCS})
|
||||
xo_self_dependency(${SELF_EXE} reactor)
|
||||
xo_dependency(${SELF_EXE} randomgen)
|
||||
xo_external_target_dependency(${SELF_EXE} Catch2 Catch2::Catch2)
|
||||
|
||||
# end CMakeLists.txt
|
||||
236
xo-reactor/utest/PollingReactor.test.cpp
Normal file
236
xo-reactor/utest/PollingReactor.test.cpp
Normal file
|
|
@ -0,0 +1,236 @@
|
|||
/* @file PollingReactor.test.cpp */
|
||||
|
||||
#include "xo/reactor/init_reactor.hpp"
|
||||
#include "xo/reactor/PollingReactor.hpp"
|
||||
#include "xo/reactor/FifoQueue.hpp"
|
||||
#include "xo/reactor/Sink.hpp"
|
||||
#include "xo/randomgen/xoshiro256.hpp"
|
||||
#include "xo/indentlog/print/pair.hpp"
|
||||
#include "catch2/catch.hpp"
|
||||
|
||||
namespace xo {
|
||||
//using xo::reactor::Reactor;
|
||||
using xo::reactor::PollingReactor;
|
||||
using xo::reactor::FifoQueue;
|
||||
using xo::reactor::SinkToFunction;
|
||||
using xo::time::timeutil;
|
||||
using xo::time::seconds;
|
||||
using xo::time::utc_nanos;
|
||||
|
||||
/* note: trivial REQUIRE() call in else branch bc we still want
|
||||
* catch2 to count assertions when verification succeeds
|
||||
*/
|
||||
# define REQUIRE_ORCAPTURE(ok_flag, catch_flag, expr) \
|
||||
if (catch_flag) { \
|
||||
REQUIRE((expr)); \
|
||||
} else { \
|
||||
REQUIRE(true); \
|
||||
ok_flag &= (expr); \
|
||||
}
|
||||
|
||||
# define REQUIRE_ORFAIL(ok_flag, catch_flag, expr) \
|
||||
REQUIRE_ORCAPTURE(ok_flag, catch_flag, expr); \
|
||||
if (!ok_flag) \
|
||||
return ok_flag
|
||||
|
||||
namespace {
|
||||
using TestEvent = std::pair<utc_nanos, std::uint64_t>;
|
||||
using TestQueue = FifoQueue<TestEvent>;
|
||||
|
||||
struct RandomTestData {
|
||||
RandomTestData(std::size_t n,
|
||||
xo::rng::xoshiro256ss * p_rgen);
|
||||
|
||||
std::uint32_t size() const { return u1v_.size(); }
|
||||
std::vector<std::uint64_t> const & u1v() const { return u1v_; }
|
||||
|
||||
private:
|
||||
/* a set of n randomly chosen elements drawn from [0 .. 2n-1] */
|
||||
std::vector<std::uint64_t> u1v_;
|
||||
};
|
||||
|
||||
RandomTestData::RandomTestData(std::size_t n,
|
||||
xo::rng::xoshiro256ss * p_rgen)
|
||||
: u1v_(n)
|
||||
{
|
||||
std::shuffle(u1v_.begin(), u1v_.end(), *p_rgen);
|
||||
}
|
||||
} /*namespace*/
|
||||
|
||||
static InitEvidence s_evidence = InitSubsys<S_reactor_tag>::require();
|
||||
|
||||
namespace ut {
|
||||
TEST_CASE("polling0", "[reactor]") {
|
||||
Subsystem::initialize_all();
|
||||
|
||||
rp<PollingReactor> reactor = PollingReactor::make();
|
||||
|
||||
REQUIRE(reactor.get());
|
||||
|
||||
for (std::uint32_t i=0; i<3; ++i) {
|
||||
INFO(xtag("i", i));
|
||||
REQUIRE(reactor->run_one() == 0);
|
||||
}
|
||||
} /*TEST_CASE(polling0)*/
|
||||
|
||||
/* return true=success, false=fail */
|
||||
bool
|
||||
run_polling1_test(std::size_t n,
|
||||
bool catch_flag,
|
||||
xo::rng::xoshiro256ss * p_rgen)
|
||||
{
|
||||
scope log(XO_DEBUG(catch_flag));
|
||||
log && log(xtag("n", n));
|
||||
|
||||
bool ok_flag = true;
|
||||
|
||||
rp<PollingReactor> reactor = PollingReactor::make();
|
||||
REQUIRE_ORFAIL(ok_flag, catch_flag, reactor.get() != nullptr);
|
||||
|
||||
if (ok_flag)
|
||||
reactor->set_loglevel(catch_flag
|
||||
? log_level::always
|
||||
: log_level::error);
|
||||
|
||||
rp<TestQueue> q = TestQueue::make();
|
||||
REQUIRE_ORFAIL(ok_flag, catch_flag, q.get() != nullptr);
|
||||
|
||||
if (ok_flag)
|
||||
q->set_name("fifo");
|
||||
|
||||
/* capture delivered events */
|
||||
std::vector<TestEvent> out_ev_v;
|
||||
|
||||
auto sink_fn
|
||||
= ([&out_ev_v](TestEvent const & x) { out_ev_v.push_back(x); });
|
||||
|
||||
q->add_callback(new SinkToFunction
|
||||
<TestEvent, std::function<void (TestEvent const &)>>(sink_fn));
|
||||
|
||||
|
||||
reactor->add_source(q);
|
||||
|
||||
/* max #of consecutive inserts */
|
||||
std::size_t max_enq = std::max(1UL, n/3);
|
||||
/* max #of consecutive removes */
|
||||
std::size_t max_deq = std::max(1UL, n/3);
|
||||
|
||||
RandomTestData seq(n, p_rgen);
|
||||
|
||||
q->set_debug_sim_flag(catch_flag);
|
||||
|
||||
/* verify:
|
||||
* 1. queue conservation -- everything inserted gets delivered
|
||||
* 2. events consumed in the same order they where inserted
|
||||
* 3. no problem with queue being sometimes empty
|
||||
*/
|
||||
|
||||
utc_nanos t0 = timeutil::ymd_hms(20231011 /*ymd*/, 131300 /*hms*/);
|
||||
|
||||
/* count #of events delivered by reactor */
|
||||
std::size_t n_delivered = 0;
|
||||
|
||||
std::size_t i = 0;
|
||||
while ((i < seq.u1v().size()) || (n_delivered < n)) {
|
||||
/* sum of (#of enq, #of deq) attempted for this iteration */
|
||||
std::size_t n_work_attempted = 0;
|
||||
/* sum of (#of enq, #of deq) accomplished for this iteration */
|
||||
std::size_t n_work_done = 0;
|
||||
std::size_t n_enq = p_rgen->generate() % (max_enq + 1);
|
||||
std::size_t n_deq_attempted = 1 + (p_rgen->generate() % (max_deq + 1));
|
||||
std::size_t n_deq_done = 0;
|
||||
|
||||
/* pick random #of elements to insert (to back of queue) */
|
||||
{
|
||||
for (std::size_t j = 0; (j < n_enq) && (i < seq.u1v().size()); ++j) {
|
||||
utc_nanos ti = t0 + seconds(i);
|
||||
|
||||
q->notify_ev(std::make_pair(ti, seq.u1v()[i++]));
|
||||
}
|
||||
|
||||
n_work_attempted += n_enq;
|
||||
n_work_done += n_enq;
|
||||
}
|
||||
|
||||
/* pick random #of elements to remove (from front of queue) */
|
||||
{
|
||||
n_deq_done += reactor->run_n(n_deq_attempted);
|
||||
|
||||
n_work_attempted += n_deq_attempted;
|
||||
n_work_done += n_deq_done;
|
||||
n_delivered += n_deq_done;
|
||||
}
|
||||
|
||||
log && log(xtag("i", i),
|
||||
xtag("n", n),
|
||||
xtag("n_work_attempted", n_work_attempted),
|
||||
xtag("n_work_done", n_work_done),
|
||||
xtag("n_enq", n_enq),
|
||||
xtag("n_deq_attempted", n_deq_attempted),
|
||||
xtag("n_deq_done", n_deq_done));
|
||||
|
||||
if ((i == seq.u1v().size()) /*no more enqueues planned*/
|
||||
&& (n_work_attempted > 0)
|
||||
&& (n_work_done == 0))
|
||||
{
|
||||
/* expect incremental progress every iteration;
|
||||
* want unit test to always terminate
|
||||
*/
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
REQUIRE_ORFAIL(ok_flag, catch_flag, i == n);
|
||||
REQUIRE_ORFAIL(ok_flag, catch_flag, n_delivered == n);
|
||||
|
||||
/* check events delivered 1:1 and in order */
|
||||
for (std::size_t i=0; i<n; ++i) {
|
||||
INFO(xtag("i", i));
|
||||
|
||||
REQUIRE_ORFAIL(ok_flag, catch_flag, out_ev_v[i].second == seq.u1v()[i]);
|
||||
}
|
||||
|
||||
return true;
|
||||
} /*run_polling1_test*/
|
||||
|
||||
TEST_CASE("polling1", "[reactor]") {
|
||||
Subsystem::initialize_all();
|
||||
|
||||
//log_config::style = function_style::streamlined;
|
||||
log_config::location_tab = 100;
|
||||
|
||||
/* random data to exercise queue + reactor */
|
||||
|
||||
uint64_t seed = 14950349842636922572UL;
|
||||
/* can seed instead from /dev/random with: */
|
||||
//Seed<xo::rng::xoshiro256ss> seed;
|
||||
auto rgen = xo::rng::xoshiro256ss(seed);
|
||||
|
||||
for (std::size_t n = 4; n <= 1024; n *= 2) {
|
||||
bool ok_flag = false;
|
||||
|
||||
for (std::uint32_t attention = 0; !ok_flag && (attention < 2); ++attention) {
|
||||
ok_flag = true;
|
||||
|
||||
/* attention=0:
|
||||
* - no logging
|
||||
* - detect assertion failures, but don't report them to catch
|
||||
* attention=1:
|
||||
* - only runs if failure detected with attention=0
|
||||
* - full logging
|
||||
* - report to catch
|
||||
*/
|
||||
|
||||
bool debug_flag = (attention == 1);
|
||||
|
||||
ok_flag &= run_polling1_test(n, debug_flag, &rgen);
|
||||
}
|
||||
}
|
||||
|
||||
} /*TEST_CASE(polling1)*/
|
||||
} /*namespace ut*/
|
||||
|
||||
} /*namespace xo*/
|
||||
|
||||
|
||||
/* end PollingReactor.test.cpp */
|
||||
99
xo-reactor/utest/Sink.test.cpp
Normal file
99
xo-reactor/utest/Sink.test.cpp
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
/* @file Sink.test.cpp */
|
||||
|
||||
#include "xo/reactor/PollingReactor.hpp"
|
||||
#include "xo/reactor/Sink.hpp"
|
||||
#include "xo/indentlog/print/pair.hpp"
|
||||
#include "catch2/catch.hpp"
|
||||
|
||||
namespace xo {
|
||||
using xo::reactor::Reactor;
|
||||
using xo::reactor::PollingReactor;
|
||||
using xo::reactor::AbstractSink;
|
||||
using xo::reactor::Sink1;
|
||||
using xo::reactor::SinkEndpoint;
|
||||
using xo::reactor::SinkToConsole;
|
||||
using xo::time::utc_nanos;
|
||||
|
||||
namespace {
|
||||
class TestSink : public SinkEndpoint<int> {
|
||||
public:
|
||||
TestSink() = default;
|
||||
|
||||
virtual uint32_t n_in_ev() const override { return 0; }
|
||||
virtual bool allow_volatile_source() const override { return true; }
|
||||
virtual void notify_ev(int const & ev) override {}
|
||||
virtual void display(std::ostream & os) const override { os << "<TestSink>"; }
|
||||
}; /*TestSink*/
|
||||
|
||||
class TestSink2 : public SinkEndpoint<utc_nanos> {
|
||||
public:
|
||||
TestSink2() = default;
|
||||
|
||||
virtual uint32_t n_in_ev() const override { return 0; }
|
||||
virtual bool allow_volatile_source() const override { return true; }
|
||||
virtual void notify_ev(utc_nanos const & ev) override {}
|
||||
virtual void display(std::ostream & os) const override { os << "<TestSink2>"; }
|
||||
}; /*TestSink2*/
|
||||
|
||||
using TestSink3 = SinkToConsole<std::pair<utc_nanos, double>>;
|
||||
} /*namespace*/
|
||||
|
||||
namespace ut {
|
||||
TEST_CASE("sink-cast", "[reactor][sink]") {
|
||||
rp<TestSink> test_sink = new TestSink();
|
||||
rp<AbstractSink> sink = test_sink;
|
||||
|
||||
TestSink * cast_sink = dynamic_cast<TestSink *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == cast_sink);
|
||||
|
||||
Sink1<int> * int_sink = dynamic_cast<Sink1<int> *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == int_sink);
|
||||
|
||||
rp<Sink1<int>> int_sink2
|
||||
= Sink1<int>::require_native("TEST_CASE(sink-cast)", sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == int_sink2.get());
|
||||
} /*TEST_CASE(sink-cast)*/
|
||||
|
||||
TEST_CASE("sink-cast2", "[reactor]") {
|
||||
rp<TestSink2> test_sink = new TestSink2();
|
||||
rp<AbstractSink> sink = test_sink;
|
||||
|
||||
TestSink2 * cast_sink = dynamic_cast<TestSink2 *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == cast_sink);
|
||||
|
||||
Sink1<utc_nanos> * dt_sink = dynamic_cast<Sink1<utc_nanos> *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == dt_sink);
|
||||
|
||||
rp<Sink1<utc_nanos>> dt_sink2
|
||||
= Sink1<utc_nanos>::require_native("TEST_CASE(sink-cast2)", sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == dt_sink2.get());
|
||||
} /*TEST_CASE(sink-cast2)*/
|
||||
|
||||
TEST_CASE("sink-cast3", "[reactor]") {
|
||||
rp<TestSink3> test_sink = new TestSink3();
|
||||
rp<AbstractSink> sink = test_sink;
|
||||
|
||||
TestSink3 * cast_sink = dynamic_cast<TestSink3 *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == cast_sink);
|
||||
|
||||
Sink1<std::pair<utc_nanos, double>> * ev_sink
|
||||
= dynamic_cast<Sink1<std::pair<utc_nanos, double>> *>(sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == ev_sink);
|
||||
|
||||
rp<Sink1<std::pair<utc_nanos, double>>> ev_sink2
|
||||
= Sink1<std::pair<utc_nanos, double>>::require_native("TEST_CASE(sink-cast3)", sink.get());
|
||||
|
||||
REQUIRE(test_sink.get() == ev_sink2.get());
|
||||
} /*TEST_CASE(sink-cast3)*/
|
||||
} /*namespace ut*/
|
||||
} /*namespace xo*/
|
||||
|
||||
/* end Sink.test.cpp */
|
||||
6
xo-reactor/utest/reactor_utest_main.cpp
Normal file
6
xo-reactor/utest/reactor_utest_main.cpp
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
/* @file reactor_utest_main.cpp */
|
||||
|
||||
#define CATCH_CONFIG_MAIN
|
||||
#include "catch2/catch.hpp"
|
||||
|
||||
/* end reactor_utest_main.cpp */
|
||||
Loading…
Add table
Add a link
Reference in a new issue