initial implementation

This commit is contained in:
Roland Conybeare 2023-10-18 11:47:12 -04:00
commit 51818852a4
11 changed files with 1181 additions and 0 deletions

52
CMakeLists.txt Normal file
View file

@ -0,0 +1,52 @@
# xo-simulator/CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(simulator VERSION 1.0)
enable_language(CXX)
# common XO cmake macros (see proj/xo-cmake)
include(xo_macros/xo_cxx)
include(xo_macros/code-coverage)
# ----------------------------------------------------------------
# unit test setup
enable_testing()
# activate code coverage for all executables + libraries (when configured with -DCODE_COVERAGE=ON)
add_code_coverage()
# 1. assuming that /nix/store/ prefixes .hpp files belonging to gcc, catch2 etc.
# we're not interested in code coverage for these sources.
# 2. exclude the utest/ subdir, we don't need coverage on the unit tests themselves;
# rather, want coverage on the code that the unit tests exercise.
#
# NOTE: this seems to work only with the 'ccov-all' target. In particular, doesn't seem to do anything with the 'ccov' target
#
add_code_coverage_all_targets(EXCLUDE /nix/store/* ${PROJECT_SOURCE_DIR}/utest/* ${PROJECT_BINARY_DIR}/local/* ${PROJECT_SOURCE_DIR}/repo/*)
# ----------------------------------------------------------------
# c++ settings
# one-time project-specific c++ flags. usually empty
set(PROJECT_CXX_FLAGS "")
#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2")
add_definitions(${PROJECT_CXX_FLAGS})
xo_toplevel_compile_options()
# ----------------------------------------------------------------
add_subdirectory(src/simulator)
#add_subdirectory(utest)
# ----------------------------------------------------------------
# provide find_package() support for reactor customers
xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets)
# ----------------------------------------------------------------
# install project .hpp files
xo_install_include_tree()
# end CMakeLists.txt

View file

@ -0,0 +1,13 @@
@PACKAGE_INIT@
include(CMakeFindDependencyMacro)
# note: changes to find_dependency() calls here
# must coordinate with xo_dependency() calls
# in xo-reactor/src/reactor/CMakeLists.txt
#
find_dependency(reactor)
#find_dependency(callback)
include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake")
check_required_components("@PROJECT_NAME@")

View file

@ -0,0 +1,19 @@
/* @file EventSink.hpp */
#pragma once
namespace xo {
namespace sim {
/* something that observes (consumes) events of type T.
* we deliberately hide event sinks from top-level of simulator scaffold,
* so that we don't have to impose a common event type for T
*/
template<typename T>
class EventSink {
public:
void operator()(T const & x);
}; /*EventSink*/
} /*namespace sim*/
} /*namespace xo*/
/* end EventSink.hpp */

View file

@ -0,0 +1,299 @@
/* @file Simulator.hpp */
#pragma once
#include "xo/reactor/Reactor.hpp"
#include "SourceTimestamp.hpp"
#include "xo/reactor/ReactorSource.hpp"
#include "xo/refcnt/Refcounted.hpp"
//#include "time/Time.hpp"
#include <vector>
namespace xo {
namespace sim {
class TimeSlip;
/* delay state-changing simulator command while handling
* simulator events. need this to permit reentrancy
*/
struct ReentrantSimulatorCmd {
enum SimulatorCmdEnum { NotifySourcePrimed, CompleteAddSource, CompleteRemoveSource };
using ReactorSource = xo::reactor::ReactorSource;
public:
ReentrantSimulatorCmd() = default;
ReentrantSimulatorCmd(SimulatorCmdEnum cmd,
ref::rp<reactor::ReactorSource> const & src)
: cmd_{cmd}, src_{src} {}
static ReentrantSimulatorCmd notify_source_primed(ref::rp<ReactorSource> const & src) {
return ReentrantSimulatorCmd(NotifySourcePrimed, src);
}
static ReentrantSimulatorCmd complete_add_source(ref::rp<ReactorSource> const & src) {
return ReentrantSimulatorCmd(CompleteAddSource, src);
}
static ReentrantSimulatorCmd complete_remove_source(ref::rp<ReactorSource> const & src) {
return ReentrantSimulatorCmd(CompleteRemoveSource, src);
}
SimulatorCmdEnum cmd() const { return cmd_; }
ref::rp<ReactorSource> const & src() const { return src_; }
private:
/* NotifySourcePrimed: deferred Simulator.notify_source_primed(.src)
* CompleteAddSource: deferred Simulator.complete_add_source(.src)
* CompleteRemoveSource: deferred Simulator.complete_remove_source(.src)
*/
SimulatorCmdEnum cmd_ = NotifySourcePrimed;
/* if .cmd=NotifySourcePrimed|CompleteAddSource|CompleteRemoveSource: reactor source */
ref::rp<ReactorSource> src_;
}; /*ReentrantSimulatorCmd*/
/* Generic simulator
*
* - time advances monotonically
* - applies a modifiable set of sources
*
* A Simulator isn't an example of a Reactor,
* because it can't work with arbitrary Sources
* (may find it expedient to fake this later,
* so we can easily adopt
* Source.notify_reactor_add() / Source.notify_reactor_remove())
* in a simulation context
*/
class Simulator : public reactor::Reactor {
public:
using ReactorSourcePtr = xo::reactor::ReactorSourcePtr;
using ReactorSource = xo::reactor::ReactorSource;
using utc_nanos = xo::time::utc_nanos;
using nanos = xo::time::nanos;
public:
~Simulator();
static ref::rp<Simulator> make(utc_nanos t0);
/* value of .t0() is estabished in ctor.
* it will not change except across call to .advance_one()
* in particular .add_source() does not change .t0()
*/
utc_nanos t0() const { return t0_; }
/* timestamp of last event delivered */
utc_nanos last_tm() const { return last_tm_; }
/* total #of events delivered since sim start */
uint64_t n_event() const { return n_event_; }
/* true iff all simulation source are exhausted
* a newly-created simulator is in the exhausted state;
* it may transition to non-exhausted state across
* call to .add_source()
*/
bool is_exhausted() const { return this->src_v_.empty(); }
/* true iff src has been added to this simulator
* (by .add_source())
*/
bool is_source_present(ref::brw<ReactorSource> src) const;
/* promise:
* .next_tm() > .t0() || .is_exhausted()
*
* .next_tm() may decrease across .add_source() call
* .next_tm() may increase across .advance_one() call
*/
utc_nanos next_tm() const;
/* returns source that will be used for next simulator event.
* nullptr if no remaining sources
*/
ReactorSource * next_src() const;
/* cross-reference realtime to simulated time,
* for throttled replay
*/
TimeSlip timeslip() const;
/* compute throttled real time for next event.
* caller supplies:
* 1. a pair of timesstamps xref_ts = (sim_tm, real_tm)
* - xref_ts.sim_tm is time in simulation coords of last event
* (i.e. most recent available value of .last_tm())
* - xref_ts.real_tm is wall clock time associated with simtime
* 2. a replay factor, representing desired
* elapsed_simulation_time : elapsed_real_time
*
* return value is realtime delay to apply before next simulated event,
* in order to maintain desired replay factor
*
* The incremental api here is intended to be used from a python thread.
*
* Expect python simulation loop like:
* import pysimulator
*
* replay_factor = 1.0
* sim = pysimulator.Simulator.make(t0)
* sim.run_one()
* tslip = sim.timeslip()
* while(True):
* dt = sim.throttled_event_dt(tslip, replay_factor)
* sleep(dt)
* sim.run_one()
*
* This allows sleep() to be invoked from python,
* which plays nicely with python threading model
*/
nanos throttled_event_dt(TimeSlip xref_ts,
double replay_factor) const;
/* current contents of simulation heap, in increasing time order.
* copies heap to drain it in heap order
*/
std::vector<SourceTimestamp> heap_contents() const;
/* print heap contents to *p_scope. intended for diagnostics */
void log_heap_contents(xo::scope * p_scope) const;
/* human-readable string identifying this simulator */
std::string display_string() const;
/* emit the first available event from a single simulation source.
* resolve ties arbitrarily.
*
* returns the #of events dispatched
* (expect this always = 1)
*/
std::uint64_t advance_one_event();
/* run simulation until earliest event time t satisfies t > t1
*/
void run_until(utc_nanos t1);
/* run simulation at realtime speed, throttling according to replay_factor,
* until either:
* - simulation exhausted
* - n events handled, if n>0
* - sim clock reaches t1, if t1>t0
*
* see also .run_one(), .run_until(), .run_n(), .run()
*
* note: this method not suitable for use from python wrappers;
* would hold GIL until complete.
* for that use case better to implement throttled sim loop
* in python
*
* t1. if > .t0, limit sim to events with t < t1
* n. if > 0, sim at most n events
* replay_factor. throttle sim to keep
* {elapsed sim time} <= replay_factor * {elapsed real time}
* return. #of events simmed
*/
uint64_t run_throttled_until(utc_nanos t1,
int32_t n,
double replay_factor);
// ----- inherited from Reactor -----
/* notification when nonprimed source becomes primed
*/
virtual void notify_source_primed(ref::brw<ReactorSource> src) override;
/* add a new simulation source.
* event that precede .t0 will be discarded.
*
* returns true if src added; false if already present
*/
virtual bool add_source(ref::brw<ReactorSource> src) override;
/* remove simulation source.
* returns true if src removed; false if was not present
*
* (not typically needed for simulations)
*/
virtual bool remove_source(ref::brw<ReactorSource> src) override;
/* synonym for .advance_one_event() */
virtual std::uint64_t run_one() override;
private:
explicit Simulator(utc_nanos t0);
/* updates source timestamp in simulation heap.
* preserves
*
* Require:
* - src->is_primed()
* - .sim_heap[.sim_heap.size - 1] already refers to src
*
* need_pop_flag, if true, src is at back of heap vector,
* need to pop before re-inserting.
*/
void heap_update_source(ReactorSource * src,
bool need_pop_flag);
/* insert source into .sim_heap.
* increase sim_heap.size() by +1
*/
void heap_insert_source(ReactorSource * src);
/* complete any reentrant work encountered
* while deliverying another event
*/
void complete_delivery_work();
/* complete reentrant call to .add_source() */
void complete_add_source(ref::brw<ReactorSource> src);
/* complete reentrant call to .remove_source() */
void complete_remove_source(ref::brw<ReactorSource> src);
friend class RaiiDeliveryWork;
private:
/* simulation heap:
* each unexhausted source appears
* exactly once, in increasing time order of next event
*
* Invariant:
* - all sources s in .sim_heap satisfy:
* - s.is_exhausted() = false
* - s.t0() >= .t0
*/
std::vector<SourceTimestamp> sim_heap_;
/* initial simulation clock */
utc_nanos t0_;
/* time of most recent simulated event */
utc_nanos last_tm_;
/* #of simulated events handled */
uint64_t n_event_ = 0;
/* simulation sources
* Invariant:
* - all source s in .src_v satisfy:
* EITHER
* 1. s.is_exhausted() = true
* OR
* 2.1 s.is_exhausted() = false
* 2.2 s.t0() >= .t0
*/
std::vector<ReactorSourcePtr> src_v_;
/* reentrancy protection. set during .advance_one_event() */
bool delivery_in_progress_ = false;
/* when certain Simulator methods are invoked
* while in the midst of delivering another event,
* must defer until delivery has completed
*/
std::vector<ReentrantSimulatorCmd> reentrant_cmd_v_;
}; /*Simulator*/
} /*namespace sim*/
} /*namespace xo*/
/* end Simulator.hpp */

View file

@ -0,0 +1,107 @@
/* file SourceTimestamp.hpp
*
* author: Roland Conybeare, Sep 2022
*/
#pragma once
#include "xo/reactor/ReactorSource.hpp"
namespace xo {
namespace sim {
/* remember a timestamp for a simulation source;
* use to insert a source into simulation heap.
* don't want to use SimulationSource.t0, so that we can
* promise heap invariants without reying on
* any behavior of SimulationSource.
*
* Note: Need to resolve ties between different sources,
* if they coincide on timestamp of next event.
* For now use SimulationSource address
*/
class SourceTimestamp {
public:
using ReactorSource = xo::reactor::ReactorSource;
using utc_nanos = xo::time::utc_nanos;
public:
SourceTimestamp(utc_nanos t0,
ReactorSource * src)
: t0_(t0), src_(src) {}
static int32_t compare(SourceTimestamp const & x,
SourceTimestamp const & y) {
using xo::time::utc_nanos;
using xo::time::nanos;
nanos dt = x.t0_ - y.t0_;
if(dt < nanos(0))
return -1;
else if(dt > nanos(0))
return +1;
/* timestamps are equal */
std::ptrdiff_t dptr = (x.src() - y.src());
return dptr;
} /*compare*/
utc_nanos t0() const { return t0_; }
ReactorSource * src() const { return src_; }
void display(std::ostream & os) const;
std::string display_string() const;
private:
/* timestamp for this source */
utc_nanos t0_;
/* simulation source
* promise:
* - src.t0() >= .t0 || src.is_exhausted
*/
ReactorSource * src_ = nullptr;
}; /*SourceTimestamp*/
inline bool operator==(SourceTimestamp const & x,
SourceTimestamp const & y)
{
return SourceTimestamp::compare(x, y) == 0;
} /*operator==*/
inline bool operator<(SourceTimestamp const & x,
SourceTimestamp const & y)
{
return SourceTimestamp::compare(x, y) < 0;
} /*operator<*/
inline bool operator<=(SourceTimestamp const & x,
SourceTimestamp const & y)
{
return SourceTimestamp::compare(x, y) <= 0;
} /*operator<=*/
inline bool operator>(SourceTimestamp const & x,
SourceTimestamp const & y)
{
return SourceTimestamp::compare(x, y) > 0;
} /*operator>*/
inline bool operator>=(SourceTimestamp const & x,
SourceTimestamp const & y)
{
return SourceTimestamp::compare(x, y) >= 0;
} /*operator>=*/
inline std::ostream &
operator<<(std::ostream & os,
SourceTimestamp const & x)
{
x.display(os);
return os;
} /*operator<<*/
} /*namespace sim*/
} /*namespace xo*/
/* end SourceTimestamp.hpp*/

View file

@ -0,0 +1,39 @@
/* file TimeSlip.hpp
*
* author: Roland Conybeare, Sep 2022
*/
#pragma once
//#include "time/Time.hpp"
namespace xo {
namespace sim {
/* helper class for a throttled simulation,
* where we want simulated time to evolve at a constant rate,
* relative to real elapsed time.
*
* A TimeSlip instance pins
* simulation-time coordinates to realtime coordinates
*/
class TimeSlip {
public:
using utc_nanos = xo::time::utc_nanos;
public:
TimeSlip(utc_nanos sim_tm, utc_nanos real_tm)
: sim_tm_{sim_tm}, real_tm_{real_tm} {}
utc_nanos sim_tm() const { return sim_tm_; }
utc_nanos real_tm() const { return real_tm_; }
private:
utc_nanos sim_tm_;
utc_nanos real_tm_;
}; /*TimeSlip*/
} /*namespace sim*/
} /*namespace xo*/
/* end TimeSlip.hpp */

View file

@ -0,0 +1,20 @@
/* file init_simulator.hpp
*
* author: Roland Conybeare, Sep 2022
*/
#pragma once
#include "xo/subsys/Subsystem.hpp"
namespace xo {
enum S_simulator_tag {};
template<>
struct InitSubsys<S_simulator_tag> {
static void init();
static InitEvidence require();
};
} /*namespace xo*/
/* end init_simulator.hpp */

View file

@ -0,0 +1,19 @@
# xo-simulator/src/simulator/CMakeLists.txt
set(SELF_LIB simulator)
set(SELF_SRCS
Simulator.cpp SourceTimestamp.cpp
init_simulator.cpp)
xo_add_shared_library3(${SELF_LIB} ${PROJECT_NAME}Targets ${PROJECT_VERSION} 1 ${SELF_SRCS})
# ----------------------------------------------------------------
# external dependencies
# note: changes to xo_dependency() calls here
# must coordinate with find_dependency() calls
# in xo-simulator/cmake/simulatorConfig.cmake.in
#
xo_dependency(${SELF_LIB} reactor)
#xo_dependency(${SELF_LIB} webutil)
#xo_dependency(${SELF_LIB} callback)

550
src/simulator/Simulator.cpp Normal file
View file

@ -0,0 +1,550 @@
/* @file Simulator.cpp */
//#include "time/Time.hpp" /*need this 1st for tag(., time_point)*/
#include "init_simulator.hpp"
#include "Simulator.hpp"
#include "TimeSlip.hpp"
#include "xo/indentlog/scope.hpp"
#include <thread>
#include <algorithm>
#include <string_view>
namespace xo {
using xo::reactor::ReactorSource;
using xo::ref::brw;
using xo::time::utc_nanos;
using xo::time::nanos;
namespace sim {
class RaiiDeliveryWork {
public:
RaiiDeliveryWork(Simulator * sim) : sim_{sim} {
this->sim_->delivery_in_progress_ = true;
}
~RaiiDeliveryWork() {
this->sim_->delivery_in_progress_ = false;
this->sim_->complete_delivery_work();
}
Simulator * sim_ = nullptr;
}; /*RaiiDeliveryWork*/
ref::rp<Simulator>
Simulator::make(utc_nanos t0) {
return new Simulator(t0);
} /*make*/
Simulator::Simulator(utc_nanos t0) : t0_(t0)
{
XO_SUBSYSTEM_REQUIRE(simulator);
} /*ctor*/
Simulator::~Simulator() {
scope log(XO_ENTER0(verbose), "clear heap..");
this->sim_heap_.clear();
if (log.enabled()) {
log("visit .src_v", xtag("size", this->src_v_.size()));
for (size_t i=0; i<this->src_v_.size(); ++i) {
log(":src_v[", i, "] ", this->src_v_[i].get());
}
}
log && log("clear .src_v", xtag("size", this->src_v_.size()));
this->src_v_.clear();
} /*dtor*/
bool
Simulator::is_source_present(brw<ReactorSource> src) const
{
for (ReactorSourcePtr const & s : this->src_v_) {
if (s == src)
return true;
}
return false;
} /*is_source_pesent*/
utc_nanos
Simulator::next_tm() const {
if(this->sim_heap_.empty()) {
/* 0 remaining events in simulator */
return this->t0();
}
return this->sim_heap_.front().t0();
} /*next_tm*/
ReactorSource*
Simulator::next_src() const {
if (this->sim_heap_.empty()) {
/* 0 remaining events in simulator */
return nullptr;
}
return this->sim_heap_.front().src();
} /*next_src*/
void
Simulator::notify_source_primed(brw<ReactorSource> src)
{
scope log(XO_ENTER1(always, src->debug_sim_flag()));
brw<ReactorSource> sim_src
= brw<ReactorSource>::from(src);
log && log(xtag("src", (sim_src.get() != nullptr)),
xtag("src.name", src->name()));
if(!sim_src)
return;
log && log(xtag("sim.name", sim_src->name()),
xtag("src.current_tm", sim_src->sim_current_tm()),
xtag("sim_heap.size", this->sim_heap_.size()));
if (this->delivery_in_progress_) {
log && log("reentrant call to .notify_source_primed(), defer",
xtag("src.name", src->name()));
/* defer reentrant work until delivery completes
* see .complete_delivery_work()
*/
this->reentrant_cmd_v_.push_back
(ReentrantSimulatorCmd::notify_source_primed(src.promote()));
} else {
/* inform Simulator when a source transitions from
* 'notready' to 'ready'.
*
* this means:
* - source knows its next event
* - source should be put back into .sim_heap
*/
this->heap_insert_source(sim_src.get());
}
//if (lscope.enabled())
// this->log_heap_contents(&lscope);
} /*notify_source_primed*/
void
Simulator::complete_add_source(brw<ReactorSource> src)
{
/* also add to simulation heap */
this->sim_heap_.push_back(SourceTimestamp(src->sim_current_tm(),
src.get()));
/* use std::greater<> because we need a min-heap;
* smallest timestamp at the front
*/
std::push_heap(this->sim_heap_.begin(),
this->sim_heap_.end(),
std::greater<SourceTimestamp>());
} /*complete_add_source*/
bool
Simulator::add_source(brw<ReactorSource> sim_src)
{
scope log(XO_ENTER1(always, sim_src->debug_sim_flag()));
log && log("enter",
xtag("src", sim_src.get()),
xtag("src.name", sim_src->name()));
if(!sim_src || this->is_source_present(sim_src))
return false;
log && log("advance to t0",
xtag("t0", this->t0()));
sim_src->sim_advance_until(this->t0(), false /*!replay_flag*/);
this->src_v_.push_back(sim_src.promote());
if(sim_src->is_exhausted()) {
log && log("source exhausted!");
} else {
sim_src->notify_reactor_add(this /*reactor*/);
log && log(xtag("src.sim_current_tm", sim_src->sim_current_tm()));
if (sim_src->is_empty()) {
log && log("empty source, do not insert into .sim_heap");
/* if source is empty, don't add to sim heap yet.
* when source becomes non-empty, source will invoke
* .notify_source_primed()
* which will insert it into .sim_heap[]
*/
;
} else if (this->delivery_in_progress_) {
log && log("reentrant add non-empty source, delay");
/* defer reentrant work until delivery completes
* see .complete_delivery_work()
*/
this->reentrant_cmd_v_.push_back
(ReentrantSimulatorCmd::complete_add_source(sim_src.promote()));
} else {
log && log("non-empty source, add to .sim_heap");
this->complete_add_source(sim_src);
}
}
return true;
} /*add_source*/
void
Simulator::complete_remove_source(brw<ReactorSource> sim_src)
{
/* rebuild .sim_heap, with sim_src removed */
std::vector<SourceTimestamp> sim_heap2;
for(SourceTimestamp const & item : this->sim_heap_) {
if(item.src() == sim_src.get()) {
/* item refers to the source we are removing -> discard */
;
} else {
sim_heap2.push_back(item);
std::push_heap(sim_heap2.begin(),
sim_heap2.end(),
std::greater<SourceTimestamp>());
}
/* now discard .sim_heap, replacing with sim_heap2 */
this->sim_heap_ = std::move(sim_heap2);
}
} /*complete_remove_source*/
bool
Simulator::remove_source(brw<ReactorSource> sim_src)
{
scope log(XO_DEBUG(sim_src->debug_sim_flag()));
log && log("enter",
xtag("src", sim_src.get()),
xtag("src.name", sim_src->name()));
//brw<ReactorSource> sim_src = brw<ReactorSource>::from(src);
if(!sim_src || !this->is_source_present(sim_src))
return false;
/* WARNING: O(n)implementation here */
if (this->delivery_in_progress_) {
/* defer reentrant work until delivery completes.
* see .complete_delivery_work()
*/
this->reentrant_cmd_v_.push_back
(ReentrantSimulatorCmd::complete_remove_source(sim_src.promote()));
} else {
this->complete_remove_source(sim_src);
}
return true;
} /*remove_source*/
std::uint64_t
Simulator::run_one() {
return this->advance_one_event();
} /*run_one*/
void
Simulator::heap_update_source(ReactorSource * src, bool need_pop_flag)
{
/* Require:
* .sim_heap[.sim_heap.size - 1] already refers to src
* just updating timestamp here
*/
std::size_t simheap_z
= this->sim_heap_.size();
scope log(XO_DEBUG(src->debug_sim_flag()),
xtag("src.name", src->name()),
xtag("simheap_z", simheap_z),
xtag("src.sim_current_tm", src->sim_current_tm()));
if (need_pop_flag)
this->sim_heap_.pop_back();
/* re-insert at new timestamp */
this->sim_heap_.push_back(SourceTimestamp(src->sim_current_tm(), src));
/* use std::greater<> because we need a min-heap;
* smallest timestamp at the front
*/
std::push_heap(this->sim_heap_.begin(),
this->sim_heap_.end(),
std::greater<SourceTimestamp>());
} /*heap_update_source*/
void
Simulator::heap_insert_source(ReactorSource * src)
{
/* santify check -- src should not currently appear in heap */
for (SourceTimestamp const & src_recd : this->sim_heap_) {
if(src_recd.src() == src) {
/* uh oh. src is already present in heap! */
assert(false);
}
}
// don't need this: .heap_update_source() will insert
//this->sim_heap_.push_back(SourceTimestamp(src->sim_current_tm(), src));
this->heap_update_source(src, false /*!need_pop_flag*/);
} /*heap_insert_source*/
void
Simulator::complete_delivery_work()
{
for (ReentrantSimulatorCmd const & cmd : this->reentrant_cmd_v_) {
scope log(XO_DEBUG(cmd.src() && cmd.src()->debug_sim_flag()),
"complete reentrant work",
xtag("src.name", cmd.src()->name()));
switch (cmd.cmd()) {
case ReentrantSimulatorCmd::NotifySourcePrimed:
this->notify_source_primed(cmd.src());
break;
case ReentrantSimulatorCmd::CompleteAddSource:
this->complete_add_source(cmd.src());
break;
case ReentrantSimulatorCmd::CompleteRemoveSource:
this->complete_remove_source(cmd.src());
break;
}
//if (lscope.enabled())
// this->log_heap_contents(&lscope);
}
this->reentrant_cmd_v_.clear();
} /*complete_delivery_work*/
TimeSlip
Simulator::timeslip() const
{
utc_nanos real_tm = std::chrono::system_clock::now();
utc_nanos sim_tm = this->next_tm();
return TimeSlip(sim_tm, real_tm);
} /*timeslip*/
nanos
Simulator::throttled_event_dt(TimeSlip xref,
double replay_factor) const
{
if (replay_factor <= 0.0)
replay_factor = 1e-6;
/* hi_sim_tm: simtime for next event to be handled */
utc_nanos hi_sim_tm = this->next_tm();
/* desired elapsed /real time/ from start of simulation to
* to when simulation handles event @ hi_sim_tm
*/
nanos sim_dt = (hi_sim_tm - xref.sim_tm());
auto hi_real_tm = (xref.real_tm()
+ std::chrono::duration_cast<nanos>(sim_dt / replay_factor));
utc_nanos now_tm = std::chrono::system_clock::now();
if (now_tm < hi_real_tm)
return hi_real_tm - now_tm;
else
return nanos(0);
} /*next_throttled_tm*/
std::vector<SourceTimestamp>
Simulator::heap_contents() const
{
std::vector<SourceTimestamp> heap = this->sim_heap_;
std::vector<SourceTimestamp> retval;
while (!heap.empty()) {
retval.push_back(heap.front());
std::pop_heap(heap.begin(), heap.end(),
std::greater<SourceTimestamp>());
heap.pop_back();
}
return retval;
} /*heap_contents*/
void
Simulator::log_heap_contents(scope * p_scope) const
{
std::vector<SourceTimestamp> heap = this->sim_heap_;
p_scope->log("/ sim heap contents:");
p_scope->log("| t0 name n_in_ev n_queued_out_ev n_out_ev");
while(!heap.empty()) {
SourceTimestamp const & ts = heap.front();
p_scope->log("|"
, " ", ts.t0()
, " ", ts.src()->name()
, " ", ts.src()->n_queued_out_ev()
, " ", ts.src()->n_out_ev());
std::pop_heap(heap.begin(), heap.end(),
std::greater<SourceTimestamp>());
heap.pop_back();
}
p_scope->log("\\");
} /*print_heap_contents*/
std::string
Simulator::display_string() const
{
return "<Simulator>";
} /*display_string*/
std::uint64_t
Simulator::advance_one_event()
{
bool debug_flag = (this->loglevel() <= log_level::chatty);
if(this->sim_heap_.empty()) {
scope log(XO_DEBUG(debug_flag));
/* nothing todo */
return 0;
}
uint32_t old_heap_z = this->sim_heap_.size();
/* *src is source with earliest timestamp */
ReactorSource * src
= this->sim_heap_.front().src();
utc_nanos src_tm = this->sim_heap_.front().t0();
scope log(XO_DEBUG(debug_flag),
xtag("threshold-loglevel", this->loglevel()),
xtag("src", src != nullptr),
xtag("src.name", src->name()),
xtag("sim.src_tm", src_tm),
xtag("src.sim_current_tm", src->sim_current_tm()),
xtag("heap_z", old_heap_z));
/* NOTE: src.current_tm() isn't preserved across
* call to src.deliver_one()
*/
uint64_t retval = 0;
{
RaiiDeliveryWork raii_work(this);
retval = src->deliver_one();
this->last_tm_ = src_tm;
this->n_event_ += retval;
/* note that src.t0 may have advanced */
/* moves just-consumed timestamp event for src
* to back of .sim_heap
*/
std::pop_heap(this->sim_heap_.begin(),
this->sim_heap_.end(),
std::greater<SourceTimestamp>());
/* now .sim_heap[.sim_heap.size() = 1].src() is src,
* with stale timestamp
*/
if(src->is_exhausted() || src->is_notprimed()) {
/* remove src from .sim_
* - if src->is_exhausted(), permanently
* - if src->is_notready(), until source calls
* .notify_source_ready()
*/
this->sim_heap_.pop_back();
} else {
this->heap_update_source(src, true /*need_pop_flag*/);
}
assert(raii_work.sim_);
}
return retval;
} /*advance_one_event*/
void
Simulator::run_until(utc_nanos t1)
{
assert(!this->delivery_in_progress_);
while(!this->is_exhausted()) {
utc_nanos t = this->next_tm();
if(t > t1)
break;
this->advance_one_event();
} /*loop until done*/
} /*run_until*/
uint64_t
Simulator::run_throttled_until(utc_nanos t1,
int32_t n_max,
double replay_factor)
{
Subsystem::verify_all_initialized();
scope log(XO_ENTER0(info));
assert(!this->delivery_in_progress_);
uint64_t n = 0;
if(!this->is_exhausted()) {
n += this->run_one();
}
/* cross-reference real time with sim time */
TimeSlip tslip = this->timeslip();
while(!this->is_exhausted()) {
if ((n_max > 0) && (n >= static_cast<uint64_t>(n_max))) {
/* reached limit on #of events simmed */
return n;
}
if ((t1 > this->t0()) && (this->next_tm() > t1)) {
/* reached limit on sim time */
return n;
}
/* if sim time passing faster than realtime (scaled by replay_factor),
* wait for real elapsed time to catch up
*/
nanos wait_dt = this->throttled_event_dt(tslip, replay_factor);
if (wait_dt > std::chrono::milliseconds(1)) {
log && log(xtag("sleep_dt", wait_dt));
std::this_thread::sleep_for(wait_dt);
} else {
/* don't bother throttling for period less than 1ms, linux != rtos */
}
n += this->run_one();
}
return n;
} /*run_throttled_until*/
} /*namespace sim*/
} /*namespace xo*/
/* end Simulator.cpp */

View file

@ -0,0 +1,32 @@
/* file SourceTimestamp.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "SourceTimestamp.hpp"
#include "xo/indentlog/print/tag.hpp"
#include "xo/indentlog/print/tostr.hpp"
namespace xo {
using xo::xtag;
using xo::tostr;
namespace sim {
void
SourceTimestamp::display(std::ostream & os) const
{
os << "<SourceTimestamp";
os << xtag("t0", t0_);
os << xtag("src", ref::rp<ReactorSource>(src_));
os << ">";
} /*display*/
std::string
SourceTimestamp::display_string() const
{
return tostr(*this);
} /*display_string*/
} /*namespace sim*/
} /*namespace xo*/
/* end SourceTimestamp.cpp */

View file

@ -0,0 +1,31 @@
/* file init_simulator.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "init_simulator.hpp"
#include "xo/reactor/init_reactor.hpp"
namespace xo {
void
InitSubsys<S_simulator_tag>::init()
{
} /*init*/
InitEvidence
InitSubsys<S_simulator_tag>::require()
{
InitEvidence retval;
/* subsystem dependencies for simulator/ */
retval ^= InitSubsys<S_reactor_tag>::require();
/* simulator/'s own initialization code */
retval ^= XO_SUBSYSTEM_PROVIDE(simulator, &init);
//retval ^= Subsystem::provide<S_simulator_tag>("simulator", &init);
return retval;
} /*require*/
} /*namespace xo*/
/* end init_simulator.cpp */