diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 00000000..51006eed --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,52 @@ +# xo-websock/CMakeLists.txt + +cmake_minimum_required(VERSION 3.10) + +project(websock VERSION 1.0) +enable_language(CXX) + +# common XO cmake macros (see proj/xo-cmake) +include(xo_macros/xo_cxx) +include(xo_macros/code-coverage) + +# ---------------------------------------------------------------- +# unit test setup + +enable_testing() +# activate code coverage for all executables + libraries (when configured with -DCODE_COVERAGE=ON) +add_code_coverage() +# 1. assuming that /nix/store/ prefixes .hpp files belonging to gcc, catch2 etc. +# we're not interested in code coverage for these sources. +# 2. exclude the utest/ subdir, we don't need coverage on the unit tests themselves; +# rather, want coverage on the code that the unit tests exercise. +# +# NOTE: this seems to work only with the 'ccov-all' target. In particular, doesn't seem to do anything with the 'ccov' target +# +add_code_coverage_all_targets(EXCLUDE /nix/store/* ${PROJECT_SOURCE_DIR}/utest/* ${PROJECT_BINARY_DIR}/local/* ${PROJECT_SOURCE_DIR}/repo/*) + +# ---------------------------------------------------------------- +# c++ settings + +# one-time project-specific c++ flags. usually empty +set(PROJECT_CXX_FLAGS "") +#set(PROJECT_CXX_FLAGS "-fconcepts-diagnostics-depth=2") +add_definitions(${PROJECT_CXX_FLAGS}) + +xo_toplevel_compile_options() + +# ---------------------------------------------------------------- + +add_subdirectory(src/websock) +#add_subdirectory(utest) + +# ---------------------------------------------------------------- +# provide find_package() support for websock customers + +xo_export_cmake_config(${PROJECT_NAME} ${PROJECT_VERSION} ${PROJECT_NAME}Targets) + +# ---------------------------------------------------------------- +# install .hpp files + +xo_install_include_tree() + +# end CMakeLists.txt diff --git a/cmake/websockConfig.cmake.in b/cmake/websockConfig.cmake.in new file mode 100644 index 00000000..ac43847f --- /dev/null +++ b/cmake/websockConfig.cmake.in @@ -0,0 +1,13 @@ +@PACKAGE_INIT@ + +include(CMakeFindDependencyMacro) + +# note: changes to find_dependency() calls here +# must coordinate with xo_dependency() calls +# in xo-reactor/src/reactor/CMakeLists.txt +# +#find_dependency(reflect) +#find_dependency(callback) + +include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake") +check_required_components("@PROJECT_NAME@") diff --git a/include/xo/websock/DynamicEndpoint.hpp b/include/xo/websock/DynamicEndpoint.hpp new file mode 100644 index 00000000..8753b835 --- /dev/null +++ b/include/xo/websock/DynamicEndpoint.hpp @@ -0,0 +1,126 @@ +/* file DynamicEndpoint.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +#include "EndpointUtil.hpp" +#include "xo/webutil/HttpEndpointDescr.hpp" +#include "xo/webutil/StreamEndpointDescr.hpp" +#include "xo/webutil/Alist.hpp" +#include + +namespace xo { + namespace web { + /* a dynamic http endpoint. content served on-browser-demand + * by user-provided callback + */ + class DynamicEndpoint { + public: + using AbstractSink = xo::reactor::AbstractSink; + using CallbackId = fn::CallbackId; + + public: + static std::unique_ptr make_http(std::string uri_pattern, + HttpEndpointFn http_cb) { + return (std::unique_ptr + (new DynamicEndpoint(std::move(uri_pattern), + std::move(http_cb), + nullptr, + nullptr))); + } /*make_http*/ + + static std::unique_ptr make_stream(std::string uri_pattern, + StreamSubscribeFn sub_fn, + StreamUnsubscribeFn unsub_fn) { + return (std::unique_ptr + (new DynamicEndpoint(std::move(uri_pattern), + nullptr, + std::move(sub_fn), + std::move(unsub_fn)))); + } /*make_stream*/ + + std::string stem() const { + return EndpointUtil::stem(this->uri_pattern_); + } /*stem*/ + +#ifdef NOT_USING + /* true iff incoming_uri matches .uri_pattern */ + bool is_match(std::string const & incoming_uri) const { + /* c++ regex = javascript regexes, + * so these characters are special: + * ^ $ \ . * + ? ( ) [ ] { } | + */ + } /*is_match*/ +#endif + + /* get html from this endpoint, on behalf of uri=incoming_uri; + * write html on *p_os + * + * require: non-null http_fn + */ + void http_response(std::string const & incoming_uri, + std::ostream * p_os) const; + + /* subscribe stream from this endpoint, on behalf of uri=incoming_uri. + * send output to ws_sink + */ + CallbackId subscribe(std::string const & incoming_uri, + ref::rp const & ws_sink) const; + + /* unsubscribe stream from this endpoint; + * reverses the effect of a previous call to .subscribe() + * that returned id + */ + void unsubscribe(CallbackId id) const; + + private: + explicit DynamicEndpoint(std::string uri_pattern, + HttpEndpointFn http_fn, + StreamSubscribeFn subscribe_fn, + StreamUnsubscribeFn unsubscribe_fn); + + private: + /* pattern for this endpoint + * can be string like + * /fixed/stem/${a}/more/fixed/stuff/${b} + * in which case: + * + * 1. will match uris like: + * /fixed/stem/apple/more/fixed/stuff/bananas + * --> invoke callback with Alist + * ("a" -> "apple", "b" -> "bananas") + * endpoint will be stored in WebserverImpl.stem_map + * under fixed prefix, in this case + * /fixed/stem/ + * + * 2. will not match uris like: + * /fixed/stem/app/le/more/fixed/stuff/bononos + */ + std::string uri_pattern_; + /* regex for matching input that satisfies .uri_pattern: + * each occurrence of + * ${...} replaced by [[:alnum:]]+ + */ + std::regex uri_regex_; + /* variables found in .uri_pattern, + * in the order in which they appear + * if .uri_pattern is + * /fixed/stem/${a}/more/fixed/stuff/${b} + * then .var_v will be: + * ["a", "b"] + */ + std::vector var_v_; + /* run this function to produce an http response */ + HttpEndpointFn http_fn_; + /* run this function to subscribe event stream */ + StreamSubscribeFn subscribe_fn_; + /* run this function to unsubscribe event stream */ + StreamUnsubscribeFn unsubscribe_fn_; + }; /*DynamicEndpoint*/ + + } /*namespace web*/ +} /*namespace xo*/ + +/* end DynamicEndpoint.hpp */ diff --git a/include/xo/websock/EndpointUtil.hpp b/include/xo/websock/EndpointUtil.hpp new file mode 100644 index 00000000..e0257cd1 --- /dev/null +++ b/include/xo/websock/EndpointUtil.hpp @@ -0,0 +1,26 @@ +/* file EndpointUtil.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +#include + +namespace xo { + namespace web { + class EndpointUtil { + public: + /* find fixed prefix for a URI pattern. + * patterns are used with both http endpoints (see DynamicEndpoint), + * and stream endpoints (see StreamEndpoint) + * + * e.g. stem("/dyn/uls/${ulticker}/snap") => "/dyn/uls/" + */ + static std::string stem(std::string const & pattern); + }; /*EndpointUtil*/ + + } /*namespace web*/ +} /*namespace xo*/ + +/* end EndpointUtil.hpp */ diff --git a/include/xo/websock/SafetyToken.hpp b/include/xo/websock/SafetyToken.hpp new file mode 100644 index 00000000..f156b98a --- /dev/null +++ b/include/xo/websock/SafetyToken.hpp @@ -0,0 +1,40 @@ +/* file SafetyToken.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +namespace xo { + namespace web { + /* token for cooperative compile-time threadsafety checking. + * + * requirements for cooperating code: + * - token contains no state, so in principle can be optimized away + * - token is deliberately not copyable, and not moveable + * - derive from token, and make derived ctor private + * - make method/class responsible for threadsafety a friend of token, + * so it can have exclusive right to create a token instance. + * - pass token reference down stack + * to demonstrate ownership of protected resource, + * limited to the lifetime of called function. + */ + template + class SafetyToken { + public: + SafetyToken(SafetyToken const & x) = delete; + SafetyToken(SafetyToken && x) = delete; + + /* optionally: invoke this to "announce use of a protected resource" */ + bool verify() const { return true; } + + SafetyToken & operator=(SafetyToken const & x) = delete; + SafetyToken & operator=(SafetyToken && x) = delete; + + protected: + SafetyToken() = default; + }; /*SafetyToken*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end SafetyToken.hpp */ diff --git a/include/xo/websock/Webserver.hpp b/include/xo/websock/Webserver.hpp new file mode 100644 index 00000000..57b0ccf7 --- /dev/null +++ b/include/xo/websock/Webserver.hpp @@ -0,0 +1,116 @@ +/* @file Webserver.hpp */ + +#pragma once + +#include "xo/refcnt/Displayable.hpp" +#include "xo/printjson/PrintJson.hpp" +#include "xo/webutil/HttpEndpointDescr.hpp" +#include "xo/webutil/StreamEndpointDescr.hpp" +#include // temporary, while moving callbacks +#include +#include +#include + +namespace xo { + namespace web { + enum class Runstate { stopped, stop_requested, running }; + + class RunstateUtil { + public: + static char const * runstate_descr(Runstate x); + }; /*RunstateUtil*/ + + inline std::ostream & operator<<(std::ostream &os, Runstate x) { + os << RunstateUtil::runstate_descr(x); + return os; + } /*operator<<*/ + + class WebserverConfig { + public: + WebserverConfig() = default; + WebserverConfig(std::int32_t port, + bool tls_flag, + bool host_check_flag, + bool use_retry_flag) + : port_{port}, + tls_flag_{tls_flag}, + host_check_flag_{host_check_flag}, + use_retry_flag_{use_retry_flag} {} + + std::int32_t port() const { return port_; } + bool tls_flag() const { return tls_flag_; } + bool host_check_flag() const { return host_check_flag_; } + bool use_retry_flag() const { return use_retry_flag_; } + + private: + /* accept incoming http requests on this port# */ + std::int32_t port_ = 0; + /* if true, support https */ + bool tls_flag_ = false; + /* see LWS_SERVER_OPTION_VHOST_UPG_STRICT_HOST_CHECK */ + bool host_check_flag_ = false; + /* see lws_context_creation_info.retry_and_idle_policy */ + bool use_retry_flag_ = false; + }; /*WebserverConfig*/ + + /* libwebsocket: + * 1. doesn't support multiple threads + * (actually, looks like it does on further examination) + * 2. doesn't expose listening ports etc (at least afaik); + * in other words it expects to take over application's main thread + * + * enforce this property by making webserver a singleton + * + * .state .start_webserver() .state + * +---------+ -------------------> +---------+ + * | stopped | | running | + * +---------+ +---------+ + * ^ | + * | | .stop_webserver() + * | | + * +----------------+ | + * | stop_requested | <------------------/ + * +----------------+ + * + */ + class Webserver : public ref::Displayable { + public: + using Alist = xo::web::Alist; + using PrintJson = xo::json::PrintJson; + + public: + /* note: although webserver allows creating multiple instances, + * the underlying libwebsocket library is not advertised to be + * threadsafe + */ + static ref::rp make(WebserverConfig const & ws_config, + ref::rp const & pjson); + + /* current state */ + virtual Runstate state() const = 0; + virtual void register_http_endpoint(HttpEndpointDescr const & endpoint) = 0; + virtual void register_stream_endpoint(StreamEndpointDescr const & endpoint) = 0; + + /* start thread for this webserver; idempotent */ + virtual void start_webserver() = 0; + /* stop thread for this webserver; suitable for calling + * from interrupt handler + */ + virtual void interrupt_stop_webserver() = 0; + /* stop thread for this webserver; idempotent */ + virtual void stop_webserver() = 0; + /* wait until webserver thread stopped */ + virtual void join_webserver() = 0; + + /* send text to a websocket session identified by session_id */ + virtual void send_text(uint32_t session_id, + std::string text) = 0; + + // ----- Inherited from Displayable ----- + + virtual void display(std::ostream & os) const; + }; /*Webserver*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end Webserver.hpp */ diff --git a/include/xo/websock/WebsockUtil.hpp b/include/xo/websock/WebsockUtil.hpp new file mode 100644 index 00000000..f8626c68 --- /dev/null +++ b/include/xo/websock/WebsockUtil.hpp @@ -0,0 +1,18 @@ +/* @file WebsockUtil.hpp */ + +#pragma once + +#include + +namespace xo { + namespace web { + /* class-as-namespace idiom */ + class WebsockUtil { + public: + /* string representation for callback category enum */ + static char const * ws_callback_reason_descr(lws_callback_reasons x); + }; /*WebsockUtil*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end WebsockUtil.hpp */ diff --git a/include/xo/websock/WebsocketSink.hpp b/include/xo/websock/WebsocketSink.hpp new file mode 100644 index 00000000..863bda34 --- /dev/null +++ b/include/xo/websock/WebsocketSink.hpp @@ -0,0 +1,28 @@ +/* file WebsocketSink.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +#include "xo/reactor/AbstractSink.hpp" +#include "xo/printjson/PrintJson.hpp" + +namespace xo { + namespace web { + class Webserver; + + class WebsocketSink : public reactor::AbstractSink { + public: + using PrintJson = xo::json::PrintJson; + + public: + static ref::rp make(ref::rp const & websrv, + ref::rp const & pjson, + uint32_t session_id, + std::string const & stream_name); + }; /*WebsocketSink*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end WebsocketSink.hpp */ diff --git a/include/xo/websock/WsSafetyToken.hpp b/include/xo/websock/WsSafetyToken.hpp new file mode 100644 index 00000000..0d5302aa --- /dev/null +++ b/include/xo/websock/WsSafetyToken.hpp @@ -0,0 +1,29 @@ +/* file WsSafetyToken.hpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#pragma once + +#include "SafetyToken.hpp" +#include + +namespace xo { + namespace web { + class WebserverImplWsThread; + class WebsocketSessionRecd; + + /* only websocket thread can obtain this token */ + class WsSafetyToken : public SafetyToken { + private: + friend class WebserverImplWsThread; + + private: + /* only WebserverImpl should construct this */ + WsSafetyToken() = default; + }; /*WsSafetyToken*/ + + } /*namespace web*/ +} /*namespace xo*/ + +/* end WsSafetyToken.hpp */ diff --git a/src/websock/CMakeLists.txt b/src/websock/CMakeLists.txt new file mode 100644 index 00000000..e8aa7df2 --- /dev/null +++ b/src/websock/CMakeLists.txt @@ -0,0 +1,16 @@ +# xo-websock/CMakeLists.txt + +set(SELF_LIB websock) +set(SELF_SRCS EndpointUtil.cpp DynamicEndpoint.cpp WebsockUtil.cpp WebsocketSink.cpp Webserver.cpp) + +xo_add_shared_library3(${SELF_LIB} ${PROJECT_NAME}Targets ${PROJECT_VERSION} 1 ${SELF_SRCS}) + +# ---------------------------------------------------------------- +# external dependencies + +xo_dependency(${SELF_LIB} reactor) +xo_dependency(${SELF_LIB} webutil) + +# note: changes to xo_dependency() calls here +# must coordinate with find_dependency() calls in +# xo-websock/cmake/websockConfig.cmake.in diff --git a/src/websock/DynamicEndpoint.cpp b/src/websock/DynamicEndpoint.cpp new file mode 100644 index 00000000..3df46d47 --- /dev/null +++ b/src/websock/DynamicEndpoint.cpp @@ -0,0 +1,146 @@ +/* file DynamicEndpoint.cpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#include "DynamicEndpoint.hpp" + +namespace xo { + using xo::web::Alist; + using xo::fn::CallbackId; + using xo::ref::rp; + + namespace web { + DynamicEndpoint::DynamicEndpoint(std::string uri_pattern, + HttpEndpointFn http_fn, + StreamSubscribeFn subscribe_fn, + StreamUnsubscribeFn unsubscribe_fn) + : uri_pattern_{std::move(uri_pattern)}, + http_fn_{std::move(http_fn)}, + subscribe_fn_{std::move(subscribe_fn)}, + unsubscribe_fn_{std::move(unsubscribe_fn)} + { + std::string r_pat; + + /* 1st pass -- construct pattern regex .uri_regex + * to identify urls that belong to this endpoint + * + * using regex like: + * \$\{[[:alnum:]]+\} + */ + { + std::regex var_rgx("\\$\\{[[:alnum:]]+\\}"); + + /* e.g. if .uri_pattern: + * /fixed/stem/${a}/more/fixed/stuff/${b} + * then want r_pat: + * /fixed/stem/[[:alnum:]]+/more/fixed/stuff/[[:alnum:]]+ + * to find values pattern variables like ${a}, ${b} + */ + std::regex_replace(std::back_inserter(r_pat), + this->uri_pattern_.begin(), + this->uri_pattern_.end(), + var_rgx, + std::string("([[:alnum:]]+)")); + + this->uri_regex_ = std::regex(r_pat); + } + + /* 2nd pass -- identify pattern variables */ + { + /* regex for: + * \$\{([[:alnum:]]+)\} + * use to match input like + * ${apple} + * and also extract the variable name + * apple + */ + std::regex var_rgx("\\$\\{([[:alnum:]]+)\\}"); + std::smatch match; + + std::string subject = this->uri_pattern_; + + /* if subject like + * /fixed/stem/${a}/more/fixed/stuff/${b} + * extract + * ["a", "b"] + * + * for + * /fixed/stem/${a}/more/fixed/stuff/${b}/${a} + * also extract + * ["a", "b"] + * i.e. avoid extracting the same variable name twice + */ + while (std::regex_search(subject, match, var_rgx)) { + std::string v = match[1]; + + bool present_flag = false; + + for (auto const & x : this->var_v_) { + if (x == v) { + present_flag = true; + break; + } + } + + if (!present_flag) + this->var_v_.push_back(match[1]); + + subject = match.suffix().str(); + } + } + } /*ctor*/ + + void + DynamicEndpoint::http_response(std::string const & incoming_uri, + std::ostream * p_os) const + { + /* send this uri argument list callback. + * contains variables extracted from .uri_pattern + * (variables surrounded by ${...}) + */ + Alist alist; + + /* extract pattern variables in uri + * c.f. 2nd pass in DynamicEndpoint.ctor + */ + std::smatch match; + std::string subject = incoming_uri; + + /* if subject like + * /fixed/stem/apple/more/fixed/stuff/beagle + * with .uri_pattern + * /fixed/stem/${a}/more/fixed/stuff/${b} + * then we have .uri_regex + * /fixed/stem/([[:alnum:]]+)/more/fixed/stuff/([[:alnum:]]+) + * use this to extract values for keys in .var_v, + * in the same order + */ + if (std::regex_match(subject, match, this->uri_regex_)) { + for (size_t i = 0, n = this->var_v_.size(); ivar_v_[i]; + std::string i_value = match[1+i]; + + alist.push_back(i_name, i_value); + } + } + + this->http_fn_(incoming_uri, alist, p_os); + } /*http_response*/ + + CallbackId + DynamicEndpoint::subscribe(std::string const & /*incoming_uri*/, + rp const & ws_sink) const + { + return this->subscribe_fn_(ws_sink); + } /*subscribe*/ + + void + DynamicEndpoint::unsubscribe(CallbackId id) const + { + return this->unsubscribe_fn_(id); + } /*unsubscribe*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end DynamicEndpoint.cpp */ diff --git a/src/websock/EndpointUtil.cpp b/src/websock/EndpointUtil.cpp new file mode 100644 index 00000000..f53dbd43 --- /dev/null +++ b/src/websock/EndpointUtil.cpp @@ -0,0 +1,38 @@ +/* file EndpointUtil.cpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#include "EndpointUtil.hpp" + +namespace xo { + namespace web { + std::string + EndpointUtil::stem(std::string const & pattern) + { + std::size_t p = 0; + do { + p = pattern.find_first_of("$", p); + + if ((p != std::string::npos) && (pattern[p+1] == '{')) { + /* fixed stem is chars [0 .. p-1], i.e. 1st p characters */ + break; + } + + if (p != std::string::npos) { + /* skip to next '$' */ + ++p; + } + } while (p != std::string::npos); + + if (p == std::string::npos) { + /* pattern has no variable components */ + return pattern; + } else { + return pattern.substr(0, p); + } + } /*stem*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end EndpointUtil.cpp */ diff --git a/src/websock/Webserver.cpp b/src/websock/Webserver.cpp new file mode 100644 index 00000000..6e1dbdb1 --- /dev/null +++ b/src/websock/Webserver.cpp @@ -0,0 +1,1939 @@ +/* @file Webserver.cpp + * + * Webserver + websocket container/scaffold. + * Originally adapted from libwebsocket example + * + * 19aug2022 + * This version probably overly rigid. + * Initial goal is to adpat example application so that: + * a. it works as a library + * b. doesn't hijack main thread + * + * Subsequently, want to wrap to demonstrate library working from within python + * + * In the meantime, adopting example code as-is without looking closely + * at globals/singletons + * + */ + +#include "Webserver.hpp" +#include "WebsocketSink.hpp" +#include "WebsockUtil.hpp" +#include "WsSafetyToken.hpp" +#include "DynamicEndpoint.hpp" +#include "xo/printjson/PrintJson.hpp" +#include // for Json::Reader, to parse json input +#include +#include +#include +#include +#include + +namespace xo { + using xo::web::Alist; + using xo::reactor::AbstractSink; + using xo::json::PrintJson; + using xo::fn::CallbackId; + using xo::ref::rp; + using xo::scope; + using xo::xtag; + + namespace web { + char const * + RunstateUtil::runstate_descr(Runstate x) + { +# define CASE(x) case Runstate::x: return #x + switch(x) { + CASE(stopped); + CASE(stop_requested); + CASE(running); + } +# undef CASE + + return "???"; + } /*runstate_descr*/ + + /* both websocket and appl thread can obtain this token. + * see WebsocketSessionRecd. Posession of this token is evidence + * caller holds WebsocketSessionRecd.mutex + */ + class WsSessionSafetyToken : public SafetyToken { + private: + friend class WebsocketSessionRecd; + + private: + /* only WebsocketSessionRecd should construct this + * mutex argument present just to alert reader + */ + WsSessionSafetyToken(std::unique_lock const &) {} + }; /*WsSessionSafetyToken*/ + + + namespace { + /* one of these is created for each client connecting to us */ + + struct OutputBuffer; + + /* editor bait: + * WebserverImpl::send_text() + * ws_pss + * + * NOTE: + * 1. per_session_data__http instances are created by libwebsocket library. + * since that's implemented in C, ctor/dtors won't be invoked for this class + */ + struct per_session_data__minimal { + public: + /* output state; allocated as bona fide c++ object */ + OutputBuffer * output_buf_; + }; /*per_session_data__minimal*/ + + /* one of these created for each message */ + + /* output destined for a particular websocket. + * 'struct msg' that folllows is a POD C struct inherited from + * libwebsocket example code; intend to retire that. + * + * An OutputMsg instance sends bytes [lo..hi), + * using as many trips as necessary + * + * +---...---+---...--------+ + * | LWS_PRE | text payload | + * +---...---+---...--------+ + * ^ ^ ^ + * .buf .text() text() + text.size() + * + * |<---A--->|<------B------->| + * |<------------C----------->| + * + * A: (LWS_PRE bytes) populated by ::lws_write(), in particular encodes .buf_z + * B: (.text_z bytes) WebserverImpl passes this range to ::lws_write() + * C: (LWS_PRE + .text_z bytes) ::lws_write() actually sends this range + * + * Note trailing null isn't required, since length is explicitly sent + */ + struct OutputBuffer { + public: + OutputBuffer(uint32_t session_id) : session_id_{session_id} {} + ~OutputBuffer() = default; + + uint32_t session_id() const { return session_id_; } + struct lws * wsi() const { return wsi_; } + + /* non-const access required. + * lws_write() will prepend headers in .buf_v[0..LWS_PRE-1] + */ + unsigned char * text() { return &(buf_v_[LWS_PRE]); } + unsigned char const * text() const { return &(buf_v_[LWS_PRE]); } + size_t text_z() const { return text_z_; } + + std::string_view text_view() const { + return std::string_view((char const *)(this->text()), + this->text_z()); + } + + bool is_busy() const { return this->sent_seq_ < this->stored_seq_; } + bool is_idle() const { return this->sent_seq_ == this->stored_seq_; } + + void establish_wsi(struct lws * wsi) { this->wsi_ = wsi; } + + bool is_writeable(WsSafetyToken const &) const { return is_writeable_; } + void set_is_writeable(bool x, WsSafetyToken const &) { is_writeable_ = x; } + + /* caller must hold WebsocketSessionRecd.mutex; + * evidenced by wsession_token + */ + void store_message(uint32_t msg_seq, + std::string const & text, + WsSessionSafetyToken const & wsession_token) { + scope log(XO_ENTER0(info)); + + wsession_token.verify(); + + if (sent_seq_ != stored_seq_) { + log && log("store_message: attempt storing new msg_seq but sent_seq!=stored_seq", + xtag("sent_seq", sent_seq_), + xtag("stored_seq", stored_seq_), + xtag("msg_seq", msg_seq)); + assert(false); + } + + size_t req_z = LWS_PRE + text.size(); + + if (this->buf_v_.size() < req_z) + this->buf_v_.resize(req_z); + + this->text_z_ = text.size(); + + ::memcpy(&(this->buf_v_[LWS_PRE]), text.c_str(), this->text_z_); + + log && log(xtag("buf", (void*)&(this->buf_v_[0])), + xtag("msg_seq", msg_seq), + xtag("text", text), + xtag("text.size", text.size()), + xtag("req_z", req_z)); + + this->stored_seq_ = msg_seq; + } /*store_message*/ + + int lws_write_aux(WsSafetyToken const & ws_safety_token) + { + scope log(XO_ENTER0(info)); + + ws_safety_token.verify(); + + this->set_is_writeable(false, ws_safety_token); + + log && log("write to websocket", + xtag("wsi", (void*)wsi_), + xtag("text_z", this->text_z())); + log && log(xtag("text", this->text_view())); + + /* 1. notice we allowed for LWS_PRE in the payload already; + * this is mandatory for LWS_WRITE_TEXT. + * 2. LWS_WRITE_TEXT requires valid utf-8 payload + * 3. lws_write() writes entire contents, using + * multiple network writes if necessary. + * Application side can ignore the possibility of partial writes. + */ + int m = ::lws_write(this->wsi_, + this->text(), + this->text_z(), + LWS_WRITE_TEXT); + + if (m < (int)this->text_z()) { + /* note: first time we observed this, browser console + * showed that entire message was eventually received, + * (though not if we exit() before returning) + */ + lwsl_user("lws_write_aux: PARTIAL WRITE: session=[%u], m=lws_write(z) with msession_id_, + m, + this->text_z()); + + /* 23sep2022: consistent with observed behavior: + * - lws will write remainder of message + * - lws will call appl via LWS_CALLBACK_SERVER_WRITEABLE + * once write has been completed + * according to docs lws buffers message -- if true, + * probably pay for message to be copied + */ + return 0; + } + + this->lws_write_completion(ws_safety_token); + + return m; + } /*lws_write_aux*/ + + /* call this after successfully sending a message */ + void lws_write_completion(WsSafetyToken const & ws_safety_token) { + /* session now writeable again. either: + * - lws_write(z) successful + * - lws_write(z) incomplete, followed by lws callback + * with reason = LWS_CALLBACK_SERVER_WRITEABLE + */ + this->set_is_writeable(true, ws_safety_token); + + /* message completely written */ + this->sent_seq_ = this->stored_seq_; + } /*lws_write_completion*/ + + private: + /* identifies websocket session associated with this buffer + * established permanently in ctor + */ + uint32_t session_id_; + + /* opaque pointer; owned by libwebsocket + identifies this session. + * established once (per websocket session) from LWS_CALLBACK_ESTABLISHED + */ + struct lws * wsi_ = nullptr; + + /* ::lws_write() takes responsibility for writing and buffering full message; + * IIU docs that means it doesn't return until full write has completed; + * this suggests it may also make reentrant callbacks for other sessions, + * while an incomplete call to lws_write() is on the stack. + * + * set .is_writeable to false during lws_write() calls, + * so that application threads can refrain from attempting nested + * lws_write() calls for the same session. + */ + bool is_writeable_ = false; + + /* seq# of last message sent using this buffer; .sent_seq chases .stored_seq */ + uint32_t sent_seq_ = 0; + /* seq# of last message stored using this buffer */ + uint32_t stored_seq_ = 0; + + /* buffer for outbound text. + * using the first LWS_PRE + .text_z bytes. + * 1st LWS_PRE bytes owned by lws library, must not touch these + */ + std::vector buf_v_; + size_t text_z_ = 0; + }; /*OutputBuffer*/ + + /* + * Unlike ws, http is a stateless protocol. This pss only exists for the + * duration of a single http transaction. With http/1.1 keep-alive and + * http/2, that is unrelated to (shorter than) the lifetime of the network + * connection. + * + * NOTE + * 1. per_session_data__http instances are created by libwebsocket library. + * since that's implemented in C, we need to arrange for manual initialization + * 2. since libwebsocket implemented in C, we don't expect auto-initialization + * or constructors to be invoked. + * 3. libwebsocket gives us several alternatives for organizing resource + * allocation. We use these callback reasons: + * LWS_CALLBACK_HTTP_BIND_PROTOCOL for setup, allocate .output_ss + * LWS_CALLBACK_HTTP_DROP_PROTOCOL for teardown free .output_ss + */ + struct per_session_data__http { + int test; + /* store http reply in .output_str */ + std::string * output_str; + }; + + /* one of these is created for each vhost our protocol is used with + * + * NOTE + * 1. per_vhost_data__minimal instances are created by libwebsocket library. + * since that's implemented in C, ctors/dtors aren't used here + * + * editor bait: vhd + */ + struct per_vhost_data__minimal { + struct lws_context * context; + struct lws_vhost * vhost; + const struct lws_protocols * protocol; + + struct per_session_data__minimal * pss_list; /* linked-list of live pss*/ + + uint32_t next_session_id_; + + //struct msg amsg; /* the one pending message... */ + //int current; /* the current message number we are caching */ + }; /*per_vhost_data__minimal*/ + } /*namespace*/ + + /* bookkeeping record for a websocket subscription. */ + class WebsocketSubscriptionRecd { + public: + WebsocketSubscriptionRecd(std::string const & incoming_uri, + DynamicEndpoint * endpoint, + rp const & ws_sink) + : incoming_uri_{incoming_uri}, + endpoint_{endpoint}, + ws_sink_{ws_sink} + {} + + void subscribe() { + this->callback_id_ = this->endpoint_->subscribe(this->incoming_uri_, + this->ws_sink_); + } /*subscribe*/ + + void unsubscribe() { + this->endpoint_->unsubscribe(this->callback_id_); + } /*unsubscribe*/ + + private: + /* original subscription url */ + std::string incoming_uri_; + /* endpoint that matched .subscribe_cmd + * (see WebserverImpl.stream_map) + */ + DynamicEndpoint * endpoint_ = nullptr; + /* id created when subscription established + * (see CallbackSetImpl.add_callback()) + */ + CallbackId callback_id_; + /* sink established to receive (& forward) events on behalf + * of this subscription. application code writes to this sink. + */ + rp ws_sink_; + }; /*WebsocketSubscriptionRecd*/ + + /* bookkeeping record for a websocket session. + * WebserverImpl (below) keeps exactly one of these + * for each active websocket session + */ + class WebsocketSessionRecd { + public: + WebsocketSessionRecd(OutputBuffer * output_buf) : output_buf_{output_buf} { + assert(this->output_buf_); + } + + bool is_output_busy() const { + return (this->output_buf_ + && this->output_buf_->is_busy()); + } + bool outbound_q_empty() const { return this->outbound_q_.empty(); } + + void subscribe_endpoint(std::string const & incoming_cmd, + DynamicEndpoint * endpoint, + rp const & ws_sink) { + + scope log(XO_ENTER0(info), + xtag("incoming_cmd", incoming_cmd)); + + std::unique_ptr sub_recd_uptr + (new WebsocketSubscriptionRecd(incoming_cmd, + endpoint, + ws_sink)); + WebsocketSubscriptionRecd * sub_recd_addr = sub_recd_uptr.get(); + + { + std::lock_guard lock(this->mutex_); + + this->active_subscription_v_.push_back(std::move(sub_recd_uptr)); + } + + /* note: need to call with lock dropped, + * since subscribe may in principle call WebserverImpl.send_text() + */ + if (sub_recd_addr) + sub_recd_addr->subscribe(); + } /*subscribe_endpoint*/ + + void send_text(std::string text) { + scope log(XO_ENTER0(info)); + + std::unique_lock lock(this->mutex_); + + if (!(this->output_buf_)) { + log && log("ws_pss.output_buf not present -> exit"); + } else if (this->is_output_busy()) { + log && log("ws_pss.output_msg busy, enqueue"); + + /* previous message already in progress, enq or drop */ + this->enqueue_text(std::move(text), + WsSessionSafetyToken(lock)); + + /* this message will eventually get sent via + * .lws_write_pending_traffic() + */ + } else { + /* send message now! */ + log && log("output_msg idle, send now"); + + this->prepare_outbound_message(std::move(text), + WsSessionSafetyToken(lock)); + + /* can release lock, won't be using for remainder of this function */ + lock.unlock(); + + lws_context * lws_cx = ::lws_get_context(this->output_buf_->wsi()); + + /* interrupt libwebsocket event loop. + * will send 'wait cancelled' event to all sockets. + * + * Actually, after testing -- looks like this sends to one wsi per protocol: + * basically to the "listening" wsi, not to websocket "session" wsi + */ + ::lws_cancel_service(lws_cx); + + /* NOTE: web documentation seems to suggest using lws_callback_on_writable(): + * + * trigger call from websocket thread to send data. + * will cause reentry via websocket thread into + * WebserverImpl::notify_minimal() + * with reason=LWS_CALLBACK_SERVER_WRITEABLE + * + * ^^^ Hmm, doesn't seem to work this way. + * Suspect this would only work if socket currently + * in non-writeable state. + */ + //lws_callback_on_writable(ws_pss->wsi); + } + } /*send_text*/ + + /* write some pending traffic from lws event loop + * + * Require: + * - MUST be invoked from lws event loop, for threadsafety; + * ws_safety_token provides evidence of this + */ + void lws_write_pending(WsSafetyToken const & ws_safety_token) { + scope log(XO_ENTER0(info)); + + log && log(xtag("output_buf", (void*)this->output_buf_)); + +#ifdef OBSOLETE + per_session_data__minimal * ws_pss = this->ws_pss_; + + if (!ws_pss) { + lscope.log("null ws_pss, exit"); + return; + } +#endif + + if (!(this->output_buf_)) { + /* output message buffer not established, + * implies nothing sent yet + */ + log && log("output_msg either not established or destroyed, exit"); + return; + } + + /* loop until no queued messages for this session */ + for (;;) { + if (!(this->output_buf_->is_writeable(ws_safety_token))) { + /* call to lws_write() already in progress */ + log && log("output_buf not writeable (bc lws_write in progress)"); + return; + } + + if (this->output_buf_->is_idle()) { + /* already up-to-date for this session */ + log && log("output idle (up-to-date)"); + return; + } + + this->output_buf_->lws_write_aux(ws_safety_token); + + /* if there are any appl messages queued, prepare to send another one */ + if (this->outbound_q_.empty()) { + /* all caught up, nothing left to send */ + log && log("up-to-date after write"); + } else { + std::unique_lock lock(this->mutex_); + + std::string text(this->dequeue_text(WsSessionSafetyToken(lock))); + + this->prepare_outbound_message(std::move(text), + WsSessionSafetyToken(lock)); + } + } + } /*lws_write_pending*/ + + /* threadsafe */ + void unsubscribe_all() { + std::lock_guard lock(this->mutex_); + + /* also drop .output_buf, + * to short-circuit any subsequent attempts to use .lws_write_pending() + * (which will happen in response to LWS_CALLBACK_EVENT_WAIT_CANCELLED + * on any session) + */ + for (auto & sub_ptr : this->active_subscription_v_) + sub_ptr->unsubscribe(); + + this->output_buf_ = nullptr; + this->active_subscription_v_.clear(); + } /*unsubscribe_all*/ + + private: + uint32_t generate_msg_seq() { return ++(this->last_msg_seq_); } + + /* enqueue application-level message. + * use this when .ws_pss.outbound_buf is busy + */ + void enqueue_text(std::string text, + WsSessionSafetyToken const & /*wss_token*/) { + this->outbound_q_.push_back(std::move(text)); + } /*enqueue_text*/ + + /* remove a deferred message from .outbound_q, + * and return it. This can happen if output becomes + * available after being write-blocked + */ + std::string dequeue_text(WsSessionSafetyToken const & /*wss_token*/) { + assert(!this->outbound_q_.empty()); + + std::string retval = std::move(this->outbound_q_.front()); + + this->outbound_q_.pop_front(); + + return retval; + } /*dequeue_text*/ + + /* prepare outbound message for sending in contiguous memory; + * in particular prepends header. + * + * this can be called from either websocket or appl thread, + * so needs to be threadsafe. + */ + void prepare_outbound_message(std::string text, + WsSessionSafetyToken const & wss_token) { + scope log(XO_ENTER0(info)); + + /* sequence# for this outbound message */ + uint32_t msg_seq = this->generate_msg_seq(); + + this->output_buf_->store_message(msg_seq, + std::move(text), + wss_token); + + /* now ws_pss->output_msg_->is_busy() */ + log && log("staged next write", + xtag("wsi", (void*)this->output_buf_->wsi()), + xtag("text_z", this->output_buf_->text_z())); + } /*prepare_outbound_message*/ + + private: + /* output destined for this session + * libws (via per_session_data__minimal) also points to + * .output_buf + */ + OutputBuffer * output_buf_ = nullptr; + /* protects .active_subscription_v, .last_msg_seq, .outbound_q */ + std::mutex mutex_; + /* active subscriptions established by this session */ + std::vector> active_subscription_v_; + /* generate seq#'s for outgoing messages */ + uint32_t last_msg_seq_ = 0; + /* when new outgoing message appears: + * 1. if .pss->output_msg empty, allocate it and store message there; + * invoke lws_callback_on_writeable(.pss->wsi) to get message sent asap + * 2. otherwise sending a previous message is in-progress; + * put outgoing message to the back of .outbound_q + */ + std::deque outbound_q_; + }; /*WebsocketSessionRecd*/ + + using EndpointMap = std::unordered_map>; + + /* defined in this translation unit, after WebserverImpl */ + class WebserverImplWsThread; + + class WebserverImpl : public Webserver { + public: + WebserverImpl(WebserverConfig const & ws_config, + rp const & pjson) + : ws_config_{ws_config}, + pjson_{pjson}, + readjson_{Json::CharReaderBuilder().newCharReader()}, + interrupt_flag_{false}, + state_{Runstate::stopped} + { + } /*ctor*/ + + virtual ~WebserverImpl() { + /* if webserver is running, initiate shutdown. + * webserver thread will eventually exit + */ + this->stop_webserver(); + /* wait for shutdown to complete */ + this->join_webserver(); + } /*dtor*/ + + virtual void run() = 0; + + // ----- Inherited from Webserver ----- + + virtual Runstate state() const override { return state_; } + virtual void register_http_endpoint(HttpEndpointDescr const & endpoint) override; + virtual void register_stream_endpoint(StreamEndpointDescr const & endpoint) override; + virtual void start_webserver() override; + virtual void interrupt_stop_webserver() override; + virtual void stop_webserver() override; + virtual void join_webserver() override; + + protected: + void set_lws_log_level() { + lws_set_log_level(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE + /* for LLL_ verbosity above NOTICE to be built into + * lws, lws must have been configured and built with + * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ + /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ + /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ + /* | LLL_DEBUG */, + NULL); + } /*set_lws_log_level*/ + +#if defined(LWS_WITH_PLUGINS) + void init_pvo(lws_protocol_vhost_options * p_pvo) { + /* {next, options, name, value} */ + *p_pvo = {NULL, NULL, "lws-minimal", ""}; + } /*init_pvo*/ +#endif + + /* called once during webserver initialization; + * identifies protocols (channels) that libws is expected to support + */ + virtual void init_protocols(std::vector * p_v) = 0; + + void init_mount_dynamic(lws_http_mount * p_mount) { + *p_mount = { + /* .mount_next */ NULL, /* linked-list "next" */ + /* .mountpoint */ "/dyn", /* mountpoint URL */ + /* .origin */ NULL, /* protocol */ + /* .def */ NULL, + /* .protocol */ "http", + /* .cgienv */ NULL, + /* .extra_mimetypes */ NULL, + /* .interpret */ NULL, + /* .cgi_timeout */ 0, + /* .cache_max_age */ 0, + /* .auth_mask */ 0, + /* .cache_reusable */ 0, + /* .cache_revalidate */ 0, + /* .cache_intermediaries */ 0, + /* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */ + /* .mountpoint_len */ 4, /* char count */ + /* .basic_auth_login_file */ NULL, + }; + } /*init_mount_dynamic*/ + + void init_mount_static(lws_http_mount const * dynamic, + lws_http_mount * p_mount) { + /* default mount serves the URL space from ./mount-origin */ + *p_mount = { + /* .mount_next */ dynamic, /* linked-list "next" */ + /* .mountpoint */ "/", /* mountpoint URL */ + /* .origin */ "./mount-origin", /* serve from dir */ + /* .def */ "index.html", /* default filename */ + /* .protocol */ NULL, + /* .cgienv */ NULL, + /* .extra_mimetypes */ NULL, + /* .interpret */ NULL, + /* .cgi_timeout */ 0, + /* .cache_max_age */ 0, + /* .auth_mask */ 0, + /* .cache_reusable */ 0, + /* .cache_revalidate */ 0, + /* .cache_intermediaries */ 0, + /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ + /* .mountpoint_len */ 1, /* char count */ + /* .basic_auth_login_file */ NULL, + }; + } /*init_mount_static*/ + + void init_retry(lws_retry_bo_t * p_retry) { + p_retry->secs_since_valid_ping = 3; + p_retry->secs_since_valid_hangup = 10; + } /*init_retry*/ + + /* requires: + * - .pvo initialized, see .init_pvo() + * - .protocol_v[] initialized, see .init_protocols() + * - .mount_dynamic initialized, see .init_mount_dynamic() + * - .mount_static initialized, see .init_mount_static() + * - .retry initialized, see .init_retry() + */ + void init_cx_config(lws_context_creation_info * p_cx_config) { + ::memset(p_cx_config, 0, sizeof(*p_cx_config)); + p_cx_config->port = this->ws_config_.port(); + p_cx_config->vhost_name = "localhost"; + p_cx_config->pvo = &(this->pvo_); + p_cx_config->protocols = this->protocol_v_.data(); + p_cx_config->mounts = &(this->mount_static_); + /* userdata -- accessible from context with lws_context_user() */ + p_cx_config->user = (void*)this; + +#if defined(LWS_WITH_TLS) + if (this->ws_config_.tls_flag()) { + lwsl_user("Server using TLS\n"); + p_cx_config->options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + p_cx_config->ssl_cert_filepath = "localhost-100y.cert"; + p_cx_config->ssl_private_key_filepath = "localhost=100y.key"; + } +#endif + + if (this->ws_config_.host_check_flag()) { + p_cx_config->options |= LWS_SERVER_OPTION_VHOST_UPG_STRICT_HOST_CHECK; + } + + if (this->ws_config_.use_retry_flag()) { + p_cx_config->retry_and_idle_policy = &(this->retry_); + } + } /*init_cx_config*/ + + /* check for a DynamicEndpoint stored under stem; + * if found, invoke it on incoming_uri to respond + * + * return. true iff stem matched a dynamic endpoint; + */ + DynamicEndpoint * lookup_dynamic_http_stem(std::string const & stem); + + /* write dynamic http response for incoming_uri, on *p_os + * incoming_uri will be suffix of original uri from browser, + * following dynamic mount point [/dyn]. + * see .init_mount_dynamic() + */ + void dynamic_http_response(std::string const & incoming_uri, + std::ostream * p_os); + + /* act on incoming websocket command + * expecting json like + * {"command": "subscribe", "stream": "uls"} + */ + void perform_ws_cmd(uint32_t session_id, + std::string_view incoming_svw); + +#ifdef DEFINED_BUT_NOT_USED + /* called from libwebsocket thread when session manager + * (aka "virtual host") is created for the websocket protocol + */ + void notify_vhd(per_vhost_data__minimal * vhd); +#endif + /* called from libwebsocket thread when creating a new websocket session */ + void notify_ws_session_open(OutputBuffer * output_buf, + per_vhost_data__minimal * vhd, + WsSafetyToken const & ws_safety_token); + /* called from libwebsocket thread whenever a websocket session is closed */ + void notify_ws_session_close(OutputBuffer * output_buf, + per_vhost_data__minimal * vhd, + WsSafetyToken const & ws_safety_token); + + /* send text to the websocket session identified by session_id */ + void send_text(uint32_t session_id, + std::string text) override; + + /* from lws event loop, write any pending outbound traffic + * see .pending_session_q + */ + void lws_write_pending_traffic(WsSafetyToken const & ws_safety_token); + + protected: + /* callback for http protocol */ + static int notify_dynamic_http(struct lws * wsi, + lws_callback_reasons reason, + void * user_data, + void * incoming_uri, + size_t len); + + protected: + /* see WebserverImplWsThread below, for methods + * that are exclusive to libws thread + */ + + /* initial configuration for embedded webserver */ + WebserverConfig ws_config_; + + /* json printer (w/ plugins for reflected types) */ + rp pjson_; + + /* json reader */ + std::unique_ptr readjson_; + + /* --- 1. LWS configuration stuff (set once) ---*/ + +#if defined(LWS_WITH_PLUGINS) + /* protocols listed here will "bind to vhost". + * (I don't know for sure what this means -- it's a magic spell for now) + * + */ + lws_protocol_vhost_options pvo_; +#endif + + /* protocols to accept for this webserver */ + std::vector protocol_v_; + + /* mount point for dynamic urls + * (these will be served by executing c++ code, + * instead of serving static disk files) + */ + lws_http_mount mount_dynamic_; + /* mount point for static urls + * (serve static files from file system) + */ + lws_http_mount mount_static_; + + /* retry settings + * (not sure how these are used) + */ + lws_retry_bo_t retry_; + + /* configuration record for lws context + * AFAIK require lifetime >= lws_context + */ + lws_context_creation_info cx_config_; + + /* runtime state owned by LWS library + * can get application-determined user data from a lws_context by + * lws_context_user(.lws_cx) + */ + lws_context * lws_cx_ = nullptr; + + /* --- 2. startup/shutdown control --- */ + + /* set this to true to prevent further service loop iteration */ + std::atomic interrupt_flag_; + + /* protects .state */ + std::mutex mutex_; + std::condition_variable cond_; + + /* valid states + * + * .state .thread_ptr + * ----------------------------------------------- + * running thread in WebserverImpl::run() + * stop_requested thread in WebserverImpl::run() + * stopped nullptr + */ + Runstate state_; + std::unique_ptr thread_ptr_; + + /* --- 3. plugin state (writable while server runs) --- */ + + /* map :: stem->http_fn, + * where + * stem = "longest non-variable URI prefix" + * + * use .register_http_endpoint() to insert a new URI into this map + * + * this map used for http endpoints + */ + EndpointMap stem_map_; + /* map :: stem->subscribe_fn + * where + * stem = "longest non-variable URI prefix" + * + * use .register_stream_endpoint() to insert a new URI into this map + * + * this map used for stream endpoints + */ + EndpointMap stream_map_; + + /* --- 4. libwebsocket session manager --- */ + + /* websocket-associated libwebsocket data. + * created by libwebsocket; our appl code informed via + * LWS_CALLBACK_PROTOCOL_INIT + * + * list of all active sessions is in .ws_vhd->pss_list + * (see LWS_CALLBACK_PROTOCOL_INIT, LWS_CALLBACK_ESTABLISHED, LWS_CALLBACK_CLOSED) + * + * can visit sessions with macros: + * lws_start_foreach_llp(struct per_session_data__minimal **, ppss, vhd->pss_list) { + * ..do stuff with (*ppss)->wsi for example.. + * } lws_end_foreach_llp(ppss, pss_list); + */ + per_vhost_data__minimal * ws_vhd_ = nullptr; + + /* indexed by session id# (see per_session_data__minimal.session_id) + * .session_v.size() = {max #of simultaneously-open websocket sessions}. + * may contain empty slots. if .session_v[i] is empty, + * then i appears in .free_session_id_v[], i..e .free_session_id_v[j]=i for some j + */ + std::vector> session_v_; + + /* When a session closes, its session id becomes available. + * track such session ids here, so they can be recycled. + * want to recycle because they're indexes into .session_v[], + * and we don't want that to grow without bound + */ + std::vector free_session_id_v_; + + }; /*WebserverImpl*/ + + void + WebserverImpl::register_http_endpoint(HttpEndpointDescr const & endpoint_descr) + { + auto endpoint = DynamicEndpoint::make_http(endpoint_descr.uri_pattern(), + endpoint_descr.endpoint_fn()); + + this->stem_map_[endpoint->stem()] = std::move(endpoint); + } /*register_http_endpoint*/ + + void + WebserverImpl::register_stream_endpoint(StreamEndpointDescr const & endpoint_descr) + { + auto endpoint = DynamicEndpoint::make_stream(endpoint_descr.uri_pattern(), + endpoint_descr.subscribe_fn(), + endpoint_descr.unsubscribe_fn()); + + this->stream_map_[endpoint->stem()] = std::move(endpoint); + } /*register_stream_endpoint*/ + +#ifdef DEFINED_BUT_NOT_USED + void + WebserverImpl::notify_vhd(per_vhost_data__minimal * vhd) + { + this->ws_vhd_ = vhd; + } /*notify_vhd*/ +#endif + + void + WebserverImpl::notify_ws_session_open(OutputBuffer * output_buf, + per_vhost_data__minimal * vhd, + WsSafetyToken const & ws_safety_token) + { + ws_safety_token.verify(); + + uint32_t new_id = output_buf->session_id(); + + if (this->session_v_.size() <= new_id) + this->session_v_.resize(new_id + 1); + + this->session_v_[new_id].reset(new WebsocketSessionRecd(output_buf)); + + /* control comes here when a new websocket session is created, + * after LWS_CALLBACK_HTTP_BIND_PROTOCOL + */ + output_buf->set_is_writeable(true, ws_safety_token); + + /* compute next available session id + store in vhost struct */ + + if (this->free_session_id_v_.empty()) { + /* generate a new session id */ + uint32_t id = this->session_v_.size(); + + vhd->next_session_id_ = id; + } else { + /* recycle a previously-used session id */ + uint32_t id = this->free_session_id_v_[this->free_session_id_v_.size() - 1]; + this->free_session_id_v_.pop_back(); + + vhd->next_session_id_ = id; + } + } /*notify_ws_session_open*/ + + void + WebserverImpl::notify_ws_session_close(OutputBuffer * output_buf, + per_vhost_data__minimal * /*vhd*/, + WsSafetyToken const & ws_safety_token) + { + scope log(XO_ENTER0(info)); + + log && log("enter", + xtag("this", (void*)this), + xtag("output_buf", (void*)output_buf)); + + assert(output_buf->session_id() < this->session_v_.size()); + + ws_safety_token.verify(); + + WebsocketSessionRecd * ws_session_recd + = this->session_v_[output_buf->session_id()].get(); + + if (ws_session_recd) { + ws_session_recd->unsubscribe_all(); + } + + this->free_session_id_v_.push_back(output_buf->session_id()); + } /*notify_ws_session_close*/ + + /* note: to access lws_protocols.user, + * would use lws_get_protocol(wsi)->user + */ + int + WebserverImpl::notify_dynamic_http(struct lws * wsi, + lws_callback_reasons reason, + void * user_data, + void * incoming_data, + size_t len) + { + lws_context * lws_cx = lws_get_context(wsi); + void * cx_user_data = lws_context_user(lws_cx); + WebserverImpl * websrv = reinterpret_cast(cx_user_data); + + struct per_session_data__http * http_pss + = reinterpret_cast(user_data); + + lwsl_user("notify_dynamic_http: enter: reason %d (%s): lws_cx %p websrv %p\n", + reason, + WebsockUtil::ws_callback_reason_descr(reason), + lws_cx, + websrv); + + /* scratch space for http header + * (probably only need LWS_PRE here, I think the +256 debris + * from o.g. example) + */ + uint8_t buf[LWS_PRE + 256]; + uint8_t * start = &buf[LWS_PRE]; + uint8_t * p = start; + uint8_t * end = &buf[sizeof(buf) - 1]; + + switch (reason) { + case LWS_CALLBACK_HTTP: + { + /* incoming_uri contains the uri suffix following our mountpoint [/dyn] + * (see WebserverImpl.init_mount_dynamic()). + * + * looks like this gets spuriously invoked for non-dynamic mountpoints + * given that we serve both filesystem tree + * (in url-space at /, from dir ./mount-origin) and dynamic http (in url-space at /dyn); + * + * however output from the spurious invocation seems to be discarded + */ + char const * incoming_uri + = reinterpret_cast(incoming_data); + + assert(http_pss->output_str == nullptr); + + if (http_pss->output_str == nullptr) { + http_pss->output_str = new std::string; + } + + lwsl_user("allocate output_str [%p] in http_pss [%p]", + http_pss->output_str, http_pss); + + std::stringstream response_ss; + + assert(websrv); + + websrv->dynamic_http_response(incoming_uri, + &response_ss); + + *(http_pss->output_str) = response_ss.str(); + + lwsl_user("LWS_CALLBACK_HTTP: got response [%s]", + http_pss->output_str->c_str()); + + /* choose mime type */ + constexpr char const * c_mime_type = "application/json"; + + /* prepare and write http headers + * (do these precede &p ??) + */ + if (lws_add_http_common_headers(wsi, + HTTP_STATUS_OK, + c_mime_type, + http_pss->output_str->length(), + &p, end)) + return 1; + + if (lws_finalize_write_http_header(wsi, start, &p, end)) + return 1; + + /* write the body separately */ + lws_callback_on_writable(wsi); + + return 0; + } + + case LWS_CALLBACK_HTTP_WRITEABLE: + { + if (!http_pss || !http_pss->output_str || (http_pss->output_str->length() == 0)) + break; + + /* + * Use LWS_WRITE_HTTP (instead of LWS_WRITE_HTTP_FINAL) for intermediate writes, + * on http/2 lws uses this to understand to end the stream with this + * frame. + * + * TODO: if output is large, write it in smaller chunks. + * expecting mtu like 1500 bytes, so maybe 128k + * chunks will work well? + */ + if (lws_write(wsi, + (uint8_t *)(http_pss->output_str->c_str()), + http_pss->output_str->length(), + LWS_WRITE_HTTP_FINAL) + != static_cast(http_pss->output_str->length())) + { + return 1; + } + + /* + * HTTP/1.0 no keepalive: close network connection + * HTTP/1.1 or HTTP1.0 + KA: wait / process next transaction + * HTTP/2: stream ended, parent connection remains up + */ + if (lws_http_transaction_completed(wsi)) + return -1; + + return 0; + } + + case LWS_CALLBACK_HTTP_BIND_PROTOCOL: + { + /* from libwebsocket docs: + * By default, all HTTP handling is done in protocols[0]. + * However you can bind different protocols (by name) to different parts of the URL space using callback mounts. + * This callback occurs in the new protocol when a wsi is bound to that protocol. + * Any protocol allocation related to the http transaction processing should be created then. + * These specific callbacks are necessary because with HTTP/1.1, + * a single connection may perform a series of different transactions at different URLs, + * thus the lifetime of the protocol bind is just for one transaction, not connection. + */ + if (!http_pss) + break; + + /* although we could allocate http_pss->output_ss here, + * instead delay until LWS_CALLBACK_HTTP. + * this reduces new/delete churn, since BIND/DROP callbacks + * will get invoked on every incoming request, not just for + * dynamic http requests. + */ + http_pss->output_str = nullptr; + + lwsl_user("initialize http_pss->output_str to null in http_pss [%p]", + http_pss); + } + break; + + case LWS_CALLBACK_HTTP_DROP_PROTOCOL: + /* from libwebsocket docs: + * This is called when a transaction is unbound from a protocol. + * It indicates the connection completed its transaction and may do something different now. + * Any protocol allocation related to the http transaction processing should be destroyed. + */ + if (!http_pss) + break; + + if (http_pss->output_str) { + lwsl_user("destroy string [%p] in http_pss [%p]", + http_pss->output_str, http_pss); + + delete http_pss->output_str; + + http_pss->output_str = nullptr; /*hygiene*/ + } + + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, user_data, incoming_data, len); + + } /*notify_dynamic_http*/ + + void + WebserverImpl::start_webserver() + { + switch(state_) { + case Runstate::stopped: + { + std::unique_lock lock(this->mutex_); + + this->thread_ptr_.reset(new std::thread(&WebserverImpl::run, this)); + this->state_ = Runstate::running; + } + break; + case Runstate::stop_requested: + throw std::runtime_error("webserver in stop-requested state"); + /* could invent a "restart-requested" state, I suppose */ + break; + case Runstate::running: + throw std::runtime_error("webserver already running"); + break; + } + } /*start_webserver*/ + + namespace { + DynamicEndpoint * + lookup_stem(std::string const & stem, + EndpointMap const & ep_map) + { + scope log(XO_DEBUG(true /*debug_flag*/), + xtag("stem", stem)); + + auto ix = ep_map.find(stem); + + if (ix != ep_map.end()) + return ix->second.get(); + else + return nullptr; + } /*lookup_stem*/ + + DynamicEndpoint * + lookup_pattern(std::string const & incoming_uri, + EndpointMap const & ep_map) + { + if (incoming_uri.empty()) + return nullptr; + + /* find longest prefix of incoming_uri that appears in .stem_map. + * + * 1. try the whole uri + * 2. try successively shorter prefixes of uri that end in '/' + * 3. try successively shorter prefixes of uri that do not end in '/' + */ + + /* 1. try the whole uri */ + DynamicEndpoint * endpoint = nullptr; + + endpoint = lookup_stem(incoming_uri, ep_map); + + if (!endpoint) { + /* 2. try successively shorter prefixes of uri that end in '/'. + * we already checked for the whole uri, so look for a match + * at or before the 2nd-last character + */ + if (incoming_uri.size() >= 2) { + std::string::size_type p = incoming_uri.size() - 1; + + while (!endpoint) { + p = incoming_uri.find_last_of('/', p-1); + + if (p == std::string::npos) + break; + + endpoint + = lookup_stem(incoming_uri.substr(0, p+1), ep_map); + + if (p == 0) + break; + } + } + } + + if (!endpoint) { + /* 3. try successively shorter prefixes of uri that don't end in '/'. + */ + if (incoming_uri.size() >= 2) { + std::string::size_type p = incoming_uri.size() - 2; + + while (!endpoint) { + if (incoming_uri[p] == '/') { + /* all stems ending in '/' have already been excluded */ + ; + } else { + endpoint + = lookup_stem(incoming_uri.substr(0, p+1), ep_map); + } + + if (p == 0) + break; + + --p; + } + } + } + + return endpoint; + } /*lookup_pattern*/ + } /*namespace*/ + + void + WebserverImpl::dynamic_http_response(std::string const & incoming_uri, + std::ostream * p_os) + { + DynamicEndpoint * endpoint = lookup_pattern(incoming_uri, + this->stem_map_); + + if (endpoint) { + endpoint->http_response(incoming_uri, p_os); + return; + } else { + /* if control here, no match */ + + /* or replace pss->str, pss->len with whatever dynamic content you like */ + time_t t0 = ::time(nullptr); + + *p_os << ("" + "" + "
no dynamic content for uri [") + << incoming_uri + << ("]" + " from mountpoint." + "
time: ") + << ctime(&t0) + << ""; + } + } /*dynamic_http_response*/ + + void + WebserverImpl::perform_ws_cmd(uint32_t session_id, + std::string_view incoming_cmd) + { + /* expecting input like: + * {"command": "subscribe", + * "stream": "usl"} + */ + + scope log(XO_ENTER0(info), + xtag("incoming_cmd", incoming_cmd)); + + Json::Value root; + + JSONCPP_STRING err; + bool ok = this->readjson_->parse(incoming_cmd.data(), + incoming_cmd.data() + incoming_cmd.size(), + &root, + &err); + + if (!ok) { + log && log("error: parsing failed", + xtag("incoming_cmd", incoming_cmd)); + } + + //std::cout << "WebserverImpl::perform_ws_cmd :root [" << root << "]" << std::endl; + + std::string cmd = root["cmd"].asString(); + + log && log("ws command", xtag("cmd", cmd)); + //std::cout << "WebserverImpl::perform_ws_cmd :cmd [" << cmd << "]" << std::endl; + + if (cmd == "subscribe") { + std::string stream_name = root["stream"].asString(); + + log && log("subscribe stream", xtag("stream", stream_name)); + + DynamicEndpoint * endpoint = lookup_pattern(stream_name, + this->stream_map_); + + if (endpoint) { + log && log("endpoint found"); + + /* sink to receive outbound events bound for session_id, + * for stream_name + */ + rp ws_sink + = WebsocketSink::make(this, + this->pjson_, + session_id, + stream_name); + + log && log("sink created"); + + assert(ws_sink->allow_polymorphic_source()); + assert(ws_sink->allow_volatile_source()); + + WebsocketSessionRecd * ws_recd = this->session_v_[session_id].get(); + + assert(ws_recd); + + ws_recd->subscribe_endpoint(std::string(incoming_cmd), + endpoint, + ws_sink); + } else { + log && log("endpoint not found"); + } + } + } /*perform_ws_cmd*/ + + void + WebserverImpl::interrupt_stop_webserver() + { + /* NOTE: this is threadsafe - ::lws_cancel_service() + * writes to a pipe to interrupt polling loop + */ + { + this->interrupt_flag_ = true; + + if (this->lws_cx_) { + ::lws_cancel_service(this->lws_cx_); + } + } + + std::unique_lock lock(this->mutex_); + + this->state_ = Runstate::stop_requested; + } /*interrupt_stop_webserver*/ + + void + WebserverImpl::stop_webserver() + { + std::unique_lock lock(this->mutex_); + + if(this->state_ == Runstate::running) { + this->interrupt_stop_webserver(); + } + } /*stop_webserver*/ + + void + WebserverImpl::join_webserver() { + while(true) { + std::unique_lock lock(this->mutex_); + + if (this->state_ == Runstate::stopped) + break; + + this->cond_.wait(lock); + } + + if (this->thread_ptr_) { + this->thread_ptr_->join(); + this->thread_ptr_ = nullptr; + } + } /*join_webserver*/ + + void + WebserverImpl::send_text(uint32_t session_id, + std::string text) + { + scope log(XO_ENTER0(info)); + log && log(xtag("session_id", session_id), + xtag(".session_v.size", this->session_v_.size())); + + if (session_id < this->session_v_.size()) { + WebsocketSessionRecd * p_session_recd = this->session_v_[session_id].get(); + + if (p_session_recd) + p_session_recd->send_text(text); + + //per_session_data__minimal * ws_pss = p_session_recd->ws_pss(); + + //lscope.log(xtag("ws_pss", ws_pss), + // xtag("ws_pss.wsi", ws_pss->wsi)); + + } else { + assert(false); + } + } /*send_text*/ + + void + WebserverImpl::lws_write_pending_traffic(WsSafetyToken const & ws_safety_token) + { + scope log(XO_ENTER0(info)); + + ws_safety_token.verify(); + + for (auto & session_ptr : this->session_v_) { + if (session_ptr) + session_ptr->lws_write_pending(ws_safety_token); + } + } /*lws_write_pending_traffic*/ + + /* sequester .ws_safety_token: + * it may only be used by dedicated websocket library thread + * (the unique thread that calls ::lws_service()) + */ + class WebserverImplWsThread : public WebserverImpl { + public: + WebserverImplWsThread(WebserverConfig const & ws_config, + rp const & pjson) + : WebserverImpl(ws_config, pjson) + { + scope log(XO_DEBUG(true /*debug_flag*/), + xtag("self", (void*)this)); + + this->set_lws_log_level(); +#if defined(LWS_WITH_PLUGINS) + this->init_pvo(&(this->pvo_)); +#endif + this->init_protocols(&(this->protocol_v_)); + this->init_mount_dynamic(&(this->mount_dynamic_)); + this->init_mount_static(&(this->mount_dynamic_), + &(this->mount_static_)); + this->init_cx_config(&(this->cx_config_)); + } /*ctor*/ + + /* create instance */ + static rp make(WebserverConfig const & ws_config, + rp const & pjson); + + // ----- Inherited from WebserverImpl ----- + + /* init helper */ + virtual void init_protocols(std::vector * p_v) override; + + /* run webserver. borrows calling thread, doesn't return + * until webserver stopped. + */ + virtual void run() override; + + private: + /* callback for lws-minimal protocol (websocket) */ + static int notify_minimal(struct lws * wsi, + lws_callback_reasons reason, + void * user_data, + void * incoming_uri, + size_t len); + + WsSafetyToken const & ws_safety_token() const { return ws_safety_token_; } + + private: + /* a function taking .ws_safety_token as an argument, + * announces that it is being called from the libws thread, + * i.e. reentrantly from ::lws_service() + */ + WsSafetyToken ws_safety_token_; + }; /*WebserverImplWsThread*/ + + /* 1. anything after the host:port prefix will get handled by callback_dynamic_http + * 2. host::port alone will upgrade to "lws-minimal" for websocket demo + * + * in practice p_v will be &WebserverImpl::protocol_v_ + */ + void + WebserverImplWsThread::init_protocols(std::vector * p_v) + { + /* lws_protocols: + * .name + * .callback + * .per_session_data_size + * .rx_buffer_size + * .id advertised as accessible from callback, but don't see how to use this + * .user advertised as accessible from callback, but don't see how to make this work. + * looks like libwebsocket allocates its own struct, even if .user is nonempty, + * and whether or not .per_session_data_size is 0. + * .tx_packet_size + */ + p_v->push_back({ + "http", + &WebserverImpl::notify_dynamic_http, + sizeof(struct per_session_data__http), + 0, + 0, + NULL, + 0 + }); + p_v->push_back({ + "lws-minimal", + &WebserverImplWsThread::notify_minimal, + sizeof(struct per_session_data__minimal), + 128, + 0, + NULL, + 0}); + /* mandatory end-of-array sentinel, requires by lws */ + p_v->push_back(LWS_PROTOCOL_LIST_TERM); + } /*init_protocols*/ + + /* called reentrantly from ::lws_service(), + * to do work on behalf of the websocket protocol "lws-minimal" + */ + int + WebserverImplWsThread::notify_minimal(struct lws * wsi, + lws_callback_reasons reason, + void * user_data, + void * input, + size_t input_z) + { + scope log(XO_ENTER0(info), + xtag("wsi", (void*)wsi)); + + lwsl_user("WebserverImpl::notify_minimal: enter" + ": reason %d (%s)", + reason, + WebsockUtil::ws_callback_reason_descr(reason)); + + assert(wsi); + + lws_context * lws_cx = lws_get_context(wsi); + + assert(lws_cx); + void * cx_user_data = lws_context_user(lws_cx); + + WebserverImplWsThread * websrv = reinterpret_cast(cx_user_data); + assert(websrv); + + WsSafetyToken const & ws_token = websrv->ws_safety_token(); + + struct per_session_data__minimal * ws_pss + = ((struct per_session_data__minimal *)user_data); + + lwsl_user("WebserverImpl::notify_minimal: enter" + ": reason %d (%s): wsi [%p], ws_pss [%p], lws_cx [%p] websrv [%p]\n", + reason, + WebsockUtil::ws_callback_reason_descr(reason), + wsi, + ws_pss, + lws_cx, + websrv); + + struct per_vhost_data__minimal * vhd + = ((struct per_vhost_data__minimal *) + lws_protocol_vh_priv_get(lws_get_vhost(wsi), + lws_get_protocol(wsi))); + int m; + + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + { + vhd = (reinterpret_cast + (lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(struct per_vhost_data__minimal)))); + vhd->context = lws_get_context(wsi); + vhd->vhost = lws_get_vhost(wsi); + vhd->protocol = lws_get_protocol(wsi); + vhd->pss_list = nullptr; + vhd->next_session_id_ = 1; + //vhd->current = 0; + + lwsl_user("WebserverImpl::notify_minimal: vhost=%p, protocols=%p protocol.name=%s\n", + vhd->vhost, vhd->protocol, vhd->protocol->name); + } + break; + + case LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL: + case LWS_CALLBACK_HTTP_BIND_PROTOCOL: + { + /* looks like control comes here with + * LWS_CALLBACK_HTTP_BIND_PROTOCOL, + * although based on docs would seem to expect + * LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL + * + * In any case, control here when new websocket session created + */ + + if (!ws_pss) + break; + + assert(vhd); + + ws_pss->output_buf_ = new OutputBuffer(vhd->next_session_id_++); + + lwsl_user("establish pss->output_buf [%p] in ws_pss [%p]", + ws_pss->output_buf_, + ws_pss); + } + break; + case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: + { + if (!ws_pss) + break; + + lwsl_user("destroy pss->output_msg [%p] in ws_pss [%p]", + ws_pss->output_buf_, ws_pss); + + /* don't do this here. need to access ws_pss->output_buf + * from LWS_CALLBACK_CLOSED + */ +#ifdef BROKEN + if (ws_pss->output_buf_) { + delete ws_pss->output_buf_; + ws_pss->output_buf_ = nullptr; + } +#endif + } + break; + case LWS_CALLBACK_ESTABLISHED: + { + /* control comes here when a websocket session is opened + * (after protocol negotiated) + */ + + OutputBuffer * output_buf = ws_pss->output_buf_; + + output_buf->establish_wsi(wsi); + + websrv->notify_ws_session_open(output_buf, vhd, ws_token); + } + break; + + case LWS_CALLBACK_CLOSED: + { + /* control comes here when a websocket session is closed */ + assert(websrv); + assert(ws_pss); + assert(ws_pss->output_buf_); + assert(vhd); + + websrv->notify_ws_session_close(ws_pss->output_buf_, vhd, ws_token); + + if (ws_pss->output_buf_) { + delete ws_pss->output_buf_; + ws_pss->output_buf_ = nullptr; + } + + lwsl_user("LWS_CALLBACK_CLOSED: done"); + } + break; + + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + { + if (websrv) + websrv->lws_write_pending_traffic(ws_token); + } + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + { + /* control here when: + * 1. application wants to send data + * (app uses lws_cancel_service() to trigger + * LWS_CALLBACK_EVENT_WAIT_CANCELLED) + * 2. websocket session that was previously blocked + * is now ready to receive data + * (see LWS_CALLBACK_SERVER_WRITEABLE above) + */ + +#ifdef NOT_USING + if (!vhd->amsg.payload) + break; +#endif + + if (!vhd) { + lwsl_user("client entry: vhd not yet established, return"); + break; + } + + if (!ws_pss) { + lwsl_user("client entry: ws_pss not yet established, return"); + break; + } + + if (!(ws_pss->output_buf_)) { + lwsl_user("client entry: output_msg buffer not established, return"); + /* output message container hasn't been established, + * probably bc nothing to send + */ + break; + } + + lwsl_user("notify_minimal: unblock writing, output_msg=[%p]", + ws_pss->output_buf_); + + if (ws_pss->output_buf_->is_writeable(ws_token)) { + //assert(false); + } else { + /* a previous call to lws_write() reported a partial write; + * that write has now completed + */ + ws_pss->output_buf_->lws_write_completion(ws_token); + break; + } + + if (ws_pss->output_buf_->is_idle()) { + lwsl_user("client entry: output_msg buffer up-to-date, return"); + /* already up-to-date, nothing new to send */ + break; + } + +#ifdef NOT_USING + if (ws_pss->last == vhd->current) { + /* already up-to-date */ + break; + } + + if (!pss->output_msg_.payload) { + pss->output_msg_.payload = ::malloc(LWS_PRE + output_z); + pss->output_msg_.len = output_z; + } + + ::memcpy((char *)pss->output_msg_.payload + LWS_PRE, output_cstr, output_z); + + lwsl_user("allocate pss->output_msg [%p] in pss [%p]", + pss->output_msg_.payload, pss); + m = lws_write(wsi, + ((unsigned char *)pss->output_msg_.payload) + LWS_PRE, + pss->output_msg_.len, + LWS_WRITE_TEXT); +#endif + + //XO_SCOPE(lscope); + + /* pss->output_msg_ was populated from WebserverImpl.send_text(), q.v. */ + + m = ws_pss->output_buf_->lws_write_aux(ws_token); + +#ifdef NOT_USING + m = lws_write(wsi, ((unsigned char *)vhd->amsg.payload) + + LWS_PRE, vhd->amsg.len, LWS_WRITE_TEXT); + if (m < (int)vhd->amsg.len) { .. } +#endif + if (m == -1) { + lwsl_err("WebserverImplWsThread::notify_minimal: return -1 from callback"); + return -1; + } + + //pss->last = vhd->current; + } + break; + + case LWS_CALLBACK_RECEIVE: + { + char const * incoming_cmd + = reinterpret_cast(input); + + std::string_view incoming_svw(incoming_cmd, input_z); + + //lwsl_user("receive: [%s], z [%d]", incoming_cmd, (int)input_z); + + assert(ws_pss); + assert(websrv); + + uint32_t session_id = ws_pss->output_buf_->session_id(); + + websrv->perform_ws_cmd(session_id, + incoming_svw); + +#ifdef OBSOLETE + if (vhd->amsg.payload) + minimal_destroy_message(&(vhd->amsg)); + + vhd->amsg.len = input_z; //output_z; + /* notice we over-allocate by LWS_PRE */ + vhd->amsg.payload = ::malloc(LWS_PRE + input_z); + if (!vhd->amsg.payload) { + lwsl_user("OOM: dropping\n"); + break; + } + + ::memcpy((char *)vhd->amsg.payload + LWS_PRE, input, input_z); + //vhd->current++; +#endif + +#ifdef OBSOLETE + /* + * let everybody know we want to write something on them + * as soon as they are ready + */ + lws_start_foreach_llp(struct per_session_data__minimal **, + ppss, vhd->pss_list) { + lws_callback_on_writable((*ppss)->wsi); + } lws_end_foreach_llp(ppss, pss_list); +#endif + } + break; + + default: + break; + } + + log.end_scope(); + + return 0; + } /*notify_minimal*/ + + void + WebserverImplWsThread::run() + { + scope log(XO_DEBUG(false /*debug_flag*/)); + + lwsl_user("LWS minimal http server dynamic" + " | visit http://localhost:%d\n", this->ws_config_.port()); +#if defined(LWS_WITH_PLUGINS) + lwsl_user("LWS_WITH_PLUGINS present"); + lwsl_user("LWS_WITH_TLS present"); +#endif + + /* exit when .state is stop_requested, setting state to .stopped */ + + this->lws_cx_ = lws_create_context(&(this->cx_config_)); + + if (!(this->lws_cx_)) { + lwsl_err("lws init failed\n"); + return; + } + + std::int32_t n_event = 0; + while ((n_event >= 0) && !(this->interrupt_flag_)) { + n_event = ::lws_service(this->lws_cx_, + 0 /*ignored (used to be timeout)*/); + } + + log && log("webserver runner returned - service loop exited", + xtag("n_event", n_event), + xtag("interrupted", this->interrupt_flag_.load())); + + lws_context_destroy(this->lws_cx_); + this->lws_cx_ = nullptr; + + { + std::unique_lock lock(this->mutex_); + + this->state_ = Runstate::stopped; + this->cond_.notify_all(); + } + + log && log("exit"); + } /*run*/ + + rp + WebserverImplWsThread::make(WebserverConfig const & ws_config, + rp const & pjson) + { + return new WebserverImplWsThread(ws_config, pjson); + } /*make*/ + + // ----- Webserver ----- + + rp + Webserver::make(WebserverConfig const & ws_config, + rp const & pjson) { + return WebserverImplWsThread::make(ws_config, pjson); + } /*make*/ + + void + Webserver::display(std::ostream & os) const { + os << "state()) + << ">"; + } /*display*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end Webserver.cpp */ diff --git a/src/websock/WebsockUtil.cpp b/src/websock/WebsockUtil.cpp new file mode 100644 index 00000000..7c7057fb --- /dev/null +++ b/src/websock/WebsockUtil.cpp @@ -0,0 +1,141 @@ +/* @file WebsockUtil.cpp */ + +#include "WebsockUtil.hpp" + +#define STRINGIFY(x) #x + +namespace xo { + namespace web { + char const * + WebsockUtil::ws_callback_reason_descr(lws_callback_reasons x) { + +#define CASE(x) case x: return STRINGIFY(x) + + switch (x) { + CASE(LWS_CALLBACK_PROTOCOL_INIT); + CASE(LWS_CALLBACK_PROTOCOL_DESTROY); + CASE(LWS_CALLBACK_WSI_CREATE); + CASE(LWS_CALLBACK_WSI_DESTROY); + CASE(LWS_CALLBACK_WSI_TX_CREDIT_GET); + CASE(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS); + CASE(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS); + CASE(LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION); + CASE(LWS_CALLBACK_OPENSSL_CONTEXT_REQUIRES_PRIVATE_KEY); + CASE(LWS_CALLBACK_SSL_INFO); + CASE(LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION); + CASE(LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED); + CASE(LWS_CALLBACK_HTTP); + CASE(LWS_CALLBACK_HTTP_BODY); + CASE(LWS_CALLBACK_HTTP_BODY_COMPLETION); + CASE(LWS_CALLBACK_HTTP_FILE_COMPLETION); + CASE(LWS_CALLBACK_HTTP_WRITEABLE); + CASE(LWS_CALLBACK_CLOSED_HTTP); + CASE(LWS_CALLBACK_FILTER_HTTP_CONNECTION); + CASE(LWS_CALLBACK_ADD_HEADERS); + CASE(LWS_CALLBACK_VERIFY_BASIC_AUTHORIZATION); + CASE(LWS_CALLBACK_CHECK_ACCESS_RIGHTS); + CASE(LWS_CALLBACK_PROCESS_HTML); + CASE(LWS_CALLBACK_HTTP_BIND_PROTOCOL); + CASE(LWS_CALLBACK_HTTP_DROP_PROTOCOL); + CASE(LWS_CALLBACK_HTTP_CONFIRM_UPGRADE); + CASE(LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP); + CASE(LWS_CALLBACK_CLOSED_CLIENT_HTTP); + CASE(LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ); + CASE(LWS_CALLBACK_RECEIVE_CLIENT_HTTP); + CASE(LWS_CALLBACK_COMPLETED_CLIENT_HTTP); + CASE(LWS_CALLBACK_CLIENT_HTTP_WRITEABLE); + CASE(LWS_CALLBACK_CLIENT_HTTP_REDIRECT); + CASE(LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL); + CASE(LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL); + CASE(LWS_CALLBACK_ESTABLISHED); + CASE(LWS_CALLBACK_CLOSED); + CASE(LWS_CALLBACK_SERVER_WRITEABLE); + CASE(LWS_CALLBACK_RECEIVE); + CASE(LWS_CALLBACK_RECEIVE_PONG); + CASE(LWS_CALLBACK_WS_PEER_INITIATED_CLOSE); + CASE(LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION); + CASE(LWS_CALLBACK_CONFIRM_EXTENSION_OKAY); + CASE(LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL); + CASE(LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL); + CASE(LWS_CALLBACK_CLIENT_CONNECTION_ERROR); + CASE(LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH); + CASE(LWS_CALLBACK_CLIENT_ESTABLISHED); + CASE(LWS_CALLBACK_CLIENT_CLOSED); + CASE(LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER); + CASE(LWS_CALLBACK_CLIENT_RECEIVE); + CASE(LWS_CALLBACK_CLIENT_RECEIVE_PONG); + CASE(LWS_CALLBACK_CLIENT_WRITEABLE); + CASE(LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED); + CASE(LWS_CALLBACK_WS_EXT_DEFAULTS); + CASE(LWS_CALLBACK_FILTER_NETWORK_CONNECTION); + CASE(LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL); + CASE(LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL); + CASE(LWS_CALLBACK_GET_THREAD_ID); + CASE(LWS_CALLBACK_ADD_POLL_FD); + CASE(LWS_CALLBACK_DEL_POLL_FD); + CASE(LWS_CALLBACK_CHANGE_MODE_POLL_FD); + CASE(LWS_CALLBACK_LOCK_POLL); + CASE(LWS_CALLBACK_UNLOCK_POLL); + CASE(LWS_CALLBACK_CGI); + CASE(LWS_CALLBACK_CGI_TERMINATED); + CASE(LWS_CALLBACK_CGI_STDIN_DATA); + CASE(LWS_CALLBACK_CGI_STDIN_COMPLETED); + CASE(LWS_CALLBACK_CGI_PROCESS_ATTACH); + CASE(LWS_CALLBACK_SESSION_INFO); + CASE(LWS_CALLBACK_GS_EVENT); + CASE(LWS_CALLBACK_HTTP_PMO); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_RX); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_RX); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_CLOSE); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_CLOSE); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_ADOPT); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_ADOPT); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_BIND_PROTOCOL); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_BIND_PROTOCOL); + CASE(LWS_CALLBACK_RAW_PROXY_CLI_DROP_PROTOCOL); + CASE(LWS_CALLBACK_RAW_PROXY_SRV_DROP_PROTOCOL); + CASE(LWS_CALLBACK_RAW_RX); + CASE(LWS_CALLBACK_RAW_CLOSE); + CASE(LWS_CALLBACK_RAW_WRITEABLE); + CASE(LWS_CALLBACK_RAW_ADOPT); + CASE(LWS_CALLBACK_RAW_CONNECTED); + CASE(LWS_CALLBACK_RAW_SKT_BIND_PROTOCOL); + CASE(LWS_CALLBACK_RAW_SKT_DROP_PROTOCOL); + CASE(LWS_CALLBACK_RAW_ADOPT_FILE); + CASE(LWS_CALLBACK_RAW_RX_FILE); + CASE(LWS_CALLBACK_RAW_WRITEABLE_FILE); + CASE(LWS_CALLBACK_RAW_CLOSE_FILE); + CASE(LWS_CALLBACK_RAW_FILE_BIND_PROTOCOL); + CASE(LWS_CALLBACK_RAW_FILE_DROP_PROTOCOL); + CASE(LWS_CALLBACK_TIMER); + CASE(LWS_CALLBACK_EVENT_WAIT_CANCELLED); + CASE(LWS_CALLBACK_CHILD_CLOSING); + CASE(LWS_CALLBACK_CONNECTING); + CASE(LWS_CALLBACK_VHOST_CERT_AGING); + CASE(LWS_CALLBACK_VHOST_CERT_UPDATE); + CASE(LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED); + CASE(LWS_CALLBACK_MQTT_IDLE); + CASE(LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED); + CASE(LWS_CALLBACK_MQTT_SUBSCRIBED); + CASE(LWS_CALLBACK_MQTT_CLIENT_WRITEABLE); + CASE(LWS_CALLBACK_MQTT_CLIENT_RX); + CASE(LWS_CALLBACK_MQTT_UNSUBSCRIBED); + CASE(LWS_CALLBACK_MQTT_DROP_PROTOCOL); + CASE(LWS_CALLBACK_MQTT_CLIENT_CLOSED); + CASE(LWS_CALLBACK_MQTT_ACK); + CASE(LWS_CALLBACK_MQTT_RESEND); + CASE(LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT); + CASE(LWS_CALLBACK_MQTT_SHADOW_TIMEOUT); + CASE(LWS_CALLBACK_USER); + } + +#undef CASE + + return "???"; + } /*ws_callback_reason_descr*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end WebsockUtil.cpp */ diff --git a/src/websock/WebsocketSink.cpp b/src/websock/WebsocketSink.cpp new file mode 100644 index 00000000..1c406432 --- /dev/null +++ b/src/websock/WebsocketSink.cpp @@ -0,0 +1,148 @@ +/* file WebsocketSink.cpp + * + * author: Roland Conybeare, Sep 2022 + */ + +#include "WebsocketSink.hpp" +#include "Webserver.hpp" +#include "xo/printjson/PrintJson.hpp" +#include "xo/reflect/Reflect.hpp" +#include "xo/reflect/TaggedPtr.hpp" +#include "xo/indentlog/scope.hpp" + +namespace xo { + using xo::reactor::AbstractSource; + using xo::json::PrintJson; + using xo::reflect::Reflect; + using xo::reflect::TaggedPtr; + using xo::reflect::TypeDescr; + using xo::ref::rp; + using xo::ref::brw; + using xo::print::quoted; + using xo::print::qcstr; + using xo::scope; + using xo::xtag; + + namespace web { + /* a sink that publishes to a websocket. + * The websocket api creates a WebsocketSink instance + * on behalf of an incoming subscription request. + * application code will hold onto the sink somewhere + * and publish events to it, to send them via websocket. + */ + class WebsocketSinkImpl : public WebsocketSink { + public: + using PrintJson = xo::json::PrintJson; + using AbstractSource = reactor::AbstractSource; + + public: + WebsocketSinkImpl(ref::rp const & websrv, + ref::rp const & pjson, + uint32_t session_id, + std::string stream_name) + : websrv_{std::move(websrv)}, + pjson_{std::move(pjson)}, + session_id_{session_id}, + stream_name_{std::move(stream_name)} + {} + + virtual std::string const & name() const override { return name_; } + virtual void set_name(std::string const & x) override { this->name_ = x; } + /* 0 consumers for websocket sink, since it's not a source */ + virtual void visit_direct_consumers(std::function)> const &) override {} + virtual void display(std::ostream & os) const override; + + virtual bool allow_polymorphic_source() const override { return true; } + virtual TypeDescr sink_ev_type() const override; + virtual bool allow_volatile_source() const override { return true; } + virtual uint32_t n_in_ev() const override { return n_in_ev_; } + virtual void attach_source(ref::rp const & src) override; + virtual void notify_ev_tp(TaggedPtr const & ev_tp) override; + + private: + /* (ideally unique) user-controlled name for this sink + * in practice not likely to be accessible, + * so probably want to generate a unique-y default + */ + std::string name_; + /* webserver implementation */ + ref::rp websrv_; + /* print arbitrary reflected stuff as json */ + ref::rp pjson_; + /* websocket session id# - events arriving at this sink + * will be sent only to the session identified by .session_id + */ + uint32_t session_id_; + /* name for stream. + * this will be the vale of the "stream" tag in + * initiating subscription message + * {"cmd": "subscribe", "stream", "/this/stream/name"} + * e.g. in python: + * web.register_stream_endpoint(kf.stream_endpoint_descr("/this/stream/name")) + */ + std::string stream_name_; + /* count #of events received */ + uint32_t n_in_ev_ = 0; + }; /*WebsocketSinkImpl*/ + + TypeDescr + WebsocketSinkImpl::sink_ev_type() const + { + return Reflect::require(); + } /*sink_ev_type*/ + + void + WebsocketSinkImpl::attach_source(rp const & src) { + src->attach_sink(this); + } /*attach_source*/ + + void + WebsocketSinkImpl::notify_ev_tp(TaggedPtr const & ev_tp) + { + scope log(XO_DEBUG(true /*debug_flag*/)); + + std::stringstream ss; + + /* format message envelope */ + ss << "{" << qcstr("stream") << ": " << quoted(this->stream_name_) + << ", " << qcstr("event") << ": "; + + /* format event as json */ + this->pjson_->print_tp(ev_tp, &ss); + + ss << "}"; + + log && log("sending", xtag("msg", ss.str())); + + ++(this->n_in_ev_); + + /* send event via associated websocket */ + this->websrv_->send_text(this->session_id_, ss.str()); + + } /*notify_ev_tp*/ + + void + WebsocketSinkImpl::display(std::ostream & os) const + { + os << ""; + } /*display*/ + + // ----- WebsocketSink ----- + + rp + WebsocketSink::make(rp const & websrv, + rp const & pjson, + uint32_t session_id, + std::string const & stream_name) + { + return new WebsocketSinkImpl(websrv, pjson, session_id, stream_name); + } /*make*/ + } /*namespace web*/ +} /*namespace xo*/ + +/* end WebsocketSink.cpp */ diff --git a/utest/CMakeLists.txt b/utest/CMakeLists.txt new file mode 100644 index 00000000..c8fcdacc --- /dev/null +++ b/utest/CMakeLists.txt @@ -0,0 +1,47 @@ +# build unittest websock/utest + +set(SELF_EXE utest.websock) +set(SELF_SRCS websock_utest_main.cpp) + +add_executable(${SELF_EXE} ${SELF_SRCS}) +xo_include_options2(${SELF_EXE}) + +## note: can't add this yet, because test not automated. +## requires manual interaction from browser +## +#add_test(NAME ${SELF_EXE} COMMAND ${SELF_EXE}) +#target_code_coverage(${SELF_EXE} AUTO ALL) + +# copy static {.html, .js, .svg} files to build directory +file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/mount-origin/" + DESTINATION "${CMAKE_CURRENT_BINARY_DIR}/mount-origin") + +# ---------------------------------------------------------------- +# internal dependency (on this codebase) + +xo_self_dependency(${SELF_EXE} websock) + +# ---------------------------------------------------------------- +# external dependencies + +target_link_libraries(${SELF_EXE} PUBLIC websock) +# Need to port option, volfit before we can build this test here +target_link_libraries(${SELF_EXE} PUBLIC option) +target_link_libraries(${SELF_EXE} PUBLIC volfit) +#target_link_libraries(utest.option PUBLIC logutil) + + +# should be getting this via xo_include_options2() + +## ---------------------------------------------------------------- +## make standard directories for std:: includes explicit +## so that +## (1) they appear in compile_commands.json. +## (2) clangd (run from emacs lsp-mode) can find them +## +#if(CMAKE_EXPORT_COMPILE_COMMANDS) +# set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES +# ${CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES}) +#endif() + +# end CMakeLists.txt diff --git a/utest/README b/utest/README new file mode 100644 index 00000000..8ed18777 --- /dev/null +++ b/utest/README @@ -0,0 +1,13 @@ +To run this unit test: + + $ cd path/to/kalman/build/src/websock/utest + $ ./utest.websock # listens for http requests on port 7682 + +point browser to + + localhost:7682/ex_websock.html + +static files served from + + path/to/kalman/build/src/websock/utest/mount-origin + \ No newline at end of file diff --git a/utest/mount-origin/bluecircle.svg b/utest/mount-origin/bluecircle.svg new file mode 100644 index 00000000..8c7b3296 --- /dev/null +++ b/utest/mount-origin/bluecircle.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/utest/mount-origin/d3ex/d3ex.ch5.ex1.html b/utest/mount-origin/d3ex/d3ex.ch5.ex1.html new file mode 100644 index 00000000..7b3c62d4 --- /dev/null +++ b/utest/mount-origin/d3ex/d3ex.ch5.ex1.html @@ -0,0 +1,15 @@ + + + + + + + simple d3 example + + + diff --git a/utest/mount-origin/ex_websock.html b/utest/mount-origin/ex_websock.html new file mode 100644 index 00000000..53963e20 --- /dev/null +++ b/utest/mount-origin/ex_websock.html @@ -0,0 +1,13 @@ + + + + pywebsock example page + + + +

pywebsock example page

+ +
+
+ + diff --git a/utest/mount-origin/ex_websock.js b/utest/mount-origin/ex_websock.js new file mode 100644 index 00000000..c26406ba --- /dev/null +++ b/utest/mount-origin/ex_websock.js @@ -0,0 +1,842 @@ +/* webpage to display kalman filter output + * coordinates with ex_websock.py in this directory + */ + +import * as d3 from "https://cdn.skypack.dev/d3@7"; +/* json5 accepts ieee floatingpoint special values; + * regular json excludes them (!?#) + */ +import JSON5 from "https://unpkg.com/json5@2/dist/index.min.mjs"; + +/* NOTE: put "export" in front of a variable/function + * that we want to make accessible outside this module + */ + +/* for use for browser's javascript console */ +globalThis.d3 = d3; +//globalThis.jparse = JSON5.parse; + +/* u: document.URL */ +function choose_ws_url(suffix_url) +{ + var pcol; + var u = document.URL; + + /* + * We open the websocket encrypted if this page came on an + * https:// url itself, otherwise unencrypted + */ + + if (u.substring(0, 5) === "https") { + pcol = "wss://"; + u = u.substr(8); + } else { + pcol = "ws://"; + if (u.substring(0, 4) === "http") + u = u.substr(7); + } + + u = u.split("/"); + + /* + "/xxx" bit is for IE10 workaround */ + + return pcol + u[0] + "/" + suffix_url; +} /*choose_ws_url*/ + +class Datatype { + #typename = null; + #nominal = null; + /* .from_json(x) convert a value received in json format + * to native representation. + */ + #from_json = null; + /* .make_scale(range) builds d3 scale object */ + #make_scale = null; + + constructor(typename, nominal, from_json, make_scale) { + this.#typename = typename; + this.#nominal = nominal; + this.#from_json = from_json; + this.#make_scale = make_scale; + } + + typename() { return this.#typename; } + nominal() { return this.#nominal; } + from_json(x) { return this.#from_json(x); } + make_scale(domain) { return this.#make_scale(domain); } +}; /*Datatype*/ + +class DatatypeFactory { + static dtype_map = DatatypeFactory.make_dtype_map(); + + static make_float_dtype() { + return new Datatype("float" /*typename*/, + 0.0 /*nominal*/, + (x) => { return x; } /*from_json*/, + (dom) => { return d3.scaleLinear().domain(dom); } /*make_scale*/ + ); } + + static make_datetime_dtype() { + return new Datatype("datetime" /*typename*/, + new Date() /*nominal*/, + (x) => { return new Date(x); } /*from_json*/, + (dom) => { return d3.scaleTime().domain(dom); } /*make_scale*/ + ); } + + static make_dtype_map() { + let retval = new Map(); + + retval.set("float", DatatypeFactory.make_float_dtype()); + retval.set("datetime", DatatypeFactory.make_datetime_dtype()); + + return retval; + } + + static lookup(typename) { + if (DatatypeFactory.dtype_map.has(typename)) { + return DatatypeFactory.dtype_map.get(typename); + } else { + throw new Error("DatatypeFactory: typename [" + + typename + + "] found where float|datetime expected"); + } + } +}; /*DatatypeFactory*/ + +/* class to extract event values for charting. + * 'traits' because applies to separately-represented event objects + */ +class DataTraits { + /* .x_slotlookup(ev) => x-value */ + #x_slotlookup = null; + /* .y_slotlookup(ev) => y-value */ + #y_slotlookup = null; + #x_datatype = null; //DatatypeFactory.lookup("datetime"); + #y_datatype = null; //DatatypeFactory.lookup("float"); + + /* x_nt, y_nt: each should be a pair [slotlookup, typename] + * - slotname is a function :: event -> jsonvalue, + * that extracts an attribute from incoming event in json format + * - typename is float|datetime + */ + constructor(x_nt, y_nt) { + this.#x_slotlookup = x_nt[0]; + this.#x_datatype = DatatypeFactory.lookup(x_nt[1]); + this.#y_slotlookup = y_nt[0]; + this.#y_datatype = DatatypeFactory.lookup(y_nt[1]); + } + + x_datatype() { return this.#x_datatype; } + y_datatype() { return this.#y_datatype; } + + x_nominal() { return this.#x_datatype.nominal(); } + y_nominal() { return this.#y_datatype.nominal(); } + + mapkey(data_ev) { return this.#x_slotlookup(data_ev); } + x_value(data_ev) { return this.#x_datatype.from_json(this.#x_slotlookup(data_ev)); } + y_value(data_ev) { return this.#y_datatype.from_json(this.#y_slotlookup(data_ev)); } + + make_x_scale(domain) { return this.#x_datatype.make_scale(domain); } + make_y_scale(domain) { return this.#y_datatype.make_scale(domain); } +}; /*DataTraits*/ + +function range_outer(lh, rh) { + return [Math.min(lh[0], rh[0]), + Math.max(lh[1], rh[1])]; +} /*range_outer*/ + +/* a dataset driving a chart. + * + * PLAN: multiple lines in the same chart + * - makeitso dataset can contain multiple data series + * - give each series within a dataset its own index# + * - each series computes its own min/max x/y values + * - take union across series to get chart x/y range + * - new class Dataset + */ +class Dataseries { + /* normalizing transformation for event objects. + * use to produce events {.x_value, .y_value} + key suitable for Map + */ + #data_traits = null; //new DataTraits(); + /* .dataset_map :: string -> {key value pair} + * must use string as keys, since Map uses object identity if key is Object + */ + #dataset_map = new Map(); + /* vector of key-value pairs, in increasing x-axis order */ + #dataset_v = []; + /* min,max value of dataset[i].x_value */ + #dset_min_x = null; + #dset_max_x = null; + /* min,max value of dataset[i].y_value */ + #dset_min_y = null; + #dset_max_y = null; + + #max_key = 0; + + constructor(data_traits) { + this.#data_traits = data_traits; + this.recalc_minmax(); + } + + data_traits() { return this.#data_traits; } + dataset_v() { return this.#dataset_v; } + x_range() { return [this.#dset_min_x, this.#dset_max_x]; } + y_range() { return [this.#dset_min_y, this.#dset_max_y]; } + + /* data_ev must have attributes consistent with what .#data_traits expects */ + update_dataset(data_ev) { + //console.log("Dataseries.update_dataset: data_ev=", data_ev); + + let x = this.#data_traits.x_value(data_ev); + let y = this.#data_traits.y_value(data_ev); + /* using this key to recognize + suppress duplicate points + * (e.g. if browser winds up sending multiple snapshot requests + * for the same dataset) + */ + let mapkey = this.#data_traits.mapkey(data_ev); + + //console.log("Dataseries.update_dataset: x=", x, ", y=", y, ", mapkey=", mapkey); + + /* in map must use time strings (not Dates) as keys */ + if (this.#dataset_map.has(mapkey)) { + /*skip -- assuming that source is immutable */; + } else { + /* kv.key is ordinal number identifying a datum. + * not related to mapkey, except in so far as both work as datum ids + */ + let kv = {key: this.#max_key, + x_value: x, + y_value: y}; + + /* (reminder: js map keys need to be strings) */ + this.#dataset_map.set(mapkey, kv); + this.#dataset_v.push(kv); + this.#max_key = this.#max_key+1; + } + } /*update_dataset*/ + + recalc_minmax() { + if (this.#dataset_v.length == 0) { + /* min,max value of dataset[i].x_value */ + this.#dset_min_x = this.#data_traits.x_nominal(); + this.#dset_max_x = this.#data_traits.x_nominal(); + /* min,max value of dataset[i].y_value */ + this.#dset_min_y = this.#data_traits.y_nominal(); + this.#dset_max_y = this.#data_traits.y_nominal(); + } else { + /* min,max value of dataset[i].x_value */ + this.#dset_min_x = d3.min(this.#dataset_v, (d) => { return d.x_value; }); + this.#dset_max_x = d3.max(this.#dataset_v, (d) => { return d.x_value; }); + /* min,max value of dataset[i].y_value */ + this.#dset_min_y = d3.min(this.#dataset_v, (d) => { return d.y_value; }); + this.#dset_max_y = d3.max(this.#dataset_v, (d) => { return d.y_value; }); + } + } /*recalc_minmax*/ + + /* note: caller should invoke .range() before using for drawing */ + make_x_scale(xrange) { + return this.#data_traits.make_x_scale(xrange /*domain*/); + } /*make_x_scale*/ + + /* note: caller should invoke .range() before using for drawing */ + make_y_scale(yrange) { + return this.#data_traits.make_y_scale(yrange /*domain*/); + } /*make_y_scale*/ +}; /*Dataseries*/ + +/* bundle multiple dataseries for charting + * for now: can have multiple series, but they need to be driven + * from the same native row storage + */ +class Dataset { + #dataseries_v = []; + /* min/max x-values across all members of .dataseries_v */ + #outer_x_range = null; + /* min/max y-values across all members of .dataseries_v */ + #outer_y_range = null; + + constructor(data_traits_v) { + for (let i=0, n=data_traits_v.length; i tag above in DOM sketch)*/ + #chart_svg = null; + + constructor(w, h, pad) { + this.#chart_w = w; + this.#chart_h = h; + this.#chart_pad = pad; + } + + x_range() { return [this.#chart_pad, + this.#chart_w - this.#chart_pad]; } + /* note: inverting bc svg y-values increase towards bottom of screen; + * we want y-values to increase towards top of screen + */ + y_range() { return [this.#chart_h - this.#chart_pad, + this.#chart_pad]; } + + require_gui(parent_d3sel, dataset) { + this.#require_x_scale(dataset); + this.#require_y_scale(dataset); + this.#require_linegen(); /*will use .chart_x_scale, .chart_y_scale */ + this.#require_x_axis(); /*will use .chart_x_scale*/ + this.#require_y_axis(); /*will use .chart_y_scale*/ + this.#require_svg(parent_d3sel, dataset); + } + + /* dom element id to use for the i'th dataseries in this chart */ + series_html_id(i_dataseries) { + return "pts-" + i_dataseries; + } + + /* update chart for new dataset contents + * + * Require: + * - .require_gui(_, dataset) has been called + * - #of dataseries has not changed since last call to .require_svg() + */ + update_chart(dataset) { + /* update d3 scales + * (shared across all series bundled into this dataset + */ + this.#rescale_chart(dataset); + + for (let i=0, n=dataset.n_dataseries(); i { return this.#chart_x_scale(d.x_value); }) + .y((d) => { return this.#chart_y_scale(d.y_value); })); + } + } /*require_linegen*/ + + #require_x_axis() { + if (!this.#chart_x_axis) { + this.#chart_x_axis = (d3 + .axisBottom() + .scale(this.#chart_x_scale) + .ticks(10)); + } + } + + #require_y_axis() { + if (!this.#chart_y_axis) { + this.#chart_y_axis = (d3 + .axisLeft() + .scale(this.#chart_y_scale) + .ticks(10)); + } + } + + #require_svg(parent_d3sel, dataset) { + if (!this.#chart_svg) { + this.#chart_svg = (parent_d3sel // .select("#uls") + .append("svg") + .attr("width", this.#chart_w) + .attr("height", this.#chart_h)); + + /* svg group comprising x-axis */ + this.#chart_svg.append("g") + .attr("class", "xaxis") + .attr("id", "x_axis") + .attr("transform", this.#x_axis_translate_str()) + .call(this.#chart_x_axis); + + /* svg group comprising y-axis */ + this.#chart_svg.append("g") + .attr("class", "yaxis") + .attr("id", "y_axis") + .attr("transform", this.#y_axis_translate_str()) + .call(this.#chart_y_axis); + + for (let i=0, n=dataset.n_dataseries(); i (will attach svg element here) + * +- + * +- (d3 will draw x-axis inside, .chart_x_axis() draws) + * +- (d3 will draw y-axis inside, .chart_y_axis() draws) + * +- + * +- (.chart_line_gen draws) + */ +class TimeseriesCtl extends Controller { + #dataset_uri = ''; + #dataset = null; + + #chart = new LineChart(500 /*w*/, + 250 /*h*/, + 50 /*pad*/); + + constructor(dataset_uri, data_traits_v) { + super(); + this.#dataset_uri = dataset_uri; + this.#dataset = new Dataset(data_traits_v); + } + + static rescale_dataset(dataset) { + dataset.recalc_minmax(); + } /*rescale_dataset*/ + + /* request dataseries snapshot from webserver; + * update+draw graph when snapshot arrives + * + * NOTE: + * 1. typical web docs (e.g. MDN) will advise using response.json(): + * fetch(uri) + * .then((response) => response.json()) + * .then((data) => dostuffwith(data)) + * + * however, this has a flaw: standard json is missing special floating-point values (!!); + * in particular it has no representation for nan/+inf/-inf + * 2. we want to use the extended json standard 'json5'; + * however need care since JSON5.parse() fails spuriously (at least JSON5/chrome asof 24sep2022) + * if given a promise + */ + request() { + fetch(this.#dataset_uri) + .then((response) => response.text()) + .then((text) => this.on_snapshot_text(text)); + +// .then((text) => JSON5.parse() +// .then((data) => this.on_snapshot(data)); + } /*request*/ + + /* update from snapshot json text */ + on_snapshot_text(text) { + const data = JSON5.parse(text); + + this.on_snapshot(data); + } /*on_snapshot_text*/ + + /* update from snapshot + * + * .on_snapshot() => .#dataset => .on_dataset() + */ + on_snapshot(data) { + //console.log("on_snapshot: data=", data); + + data.forEach((x, i) => { + // REFACTORME + + if (x._name_ == "UpxEvent") { + this.on_update(x); + } else if (x._name_ == "KalmanFilterStateExt") { + this.on_update(x); + } else { + console.log("unexpected json record x=", x); + } + }); + + this.on_dataset(this.#dataset); + } /*on_snapshot*/ + + /* update from websocket + * + * .on_update() => .#dataset => .on_dataset() + */ + on_update(data_ev) { + this.#dataset.update_dataset(data_ev); + + this.on_dataset(this.#dataset); + } /*on_update*/ + + /* call after modifying .#dataset + * + * .on_dataset() =|=> .rescale_dataset() =|========> .chart_x_axis ===\ + * | |========> .chart_x_axis =\ | + * | | | + * |=> .chart_svg.#x_axis <==========================/ | + * |=> .chart_svg.#y_axis <============================/ + */ + on_dataset(dataset) { + //console.log("on_dataset: dataset=", dataset); + + // update x-scale, y-scale + TimeseriesCtl.rescale_dataset(dataset); + + this.#chart.update_chart(dataset); + } /*on_dataset*/ + + /* e.g. + * ctl.require_gui(d3.select("#uls")) + * to build chart gui under DOM element with id="uls" + */ + require_gui(parent_d3sel) { + this.#chart.require_gui(parent_d3sel, this.#dataset); + } /*require_gui*/ + +}; /*TimeseriesCtl*/ + +/* controller for timeseries graph, from uri [/dyn/uls/snap] + [/ws/uls] */ +var uls_ctl = false; +var uls_ctl_enabled = true; + +if (uls_ctl_enabled) { + uls_ctl = new TimeseriesCtl('/dyn/uls/snap', + [new DataTraits([(ev) => ev.tm, "datetime"], + [(ev) => ev.upx, "float"])]); + + uls_ctl.require_gui(d3.select("#uls")); + uls_ctl.request(); +} + +/* controller for timeseries graph, from uri [/dyn/kfs/snap] + [/ws/kfs] */ +var kfs_ctl = false; +var kfs_ctl_enabled = true; + +if (kfs_ctl_enabled) { + kfs_ctl = new TimeseriesCtl('/dyn/kfs/snap', + [new DataTraits([(ev) => ev.tk, + "datetime"], + [(ev) => { return ev.x[0]; }, + "float"]), + /* 2.sigma below estimate */ + new DataTraits([(ev) => ev.tk, + "datetime"], + [(ev) => { return Math.max(0.0, ev.x[0] - 2.0 * Math.sqrt(ev.P[0][0])); }, + "float"]), + /* 2.sigma above estimate */ + new DataTraits([(ev) => ev.tk, + "datetime"], + [(ev) => { return Math.min(1.0, ev.x[0] + 2.0 * Math.sqrt(ev.P[0][0])); }, + "float"]) + ]); + + kfs_ctl.require_gui(d3.select("#kfs")); + kfs_ctl.request(); +} + +let key_fn = ((d) => { return d.key; }); + +/* controller for volsurface graph (strike -> volatility), + * from uri [/dyn/kf/snap] + */ + +d3.select("#refresh") + .on("click", + function() { + console.log("button[#refresh] clicked"); + + uls_ctl.request(); + }); + +var srv_ws = null; + +function content_loaded_fn() +{ + console.log("Hi Roly, DOM loaded"); + + /* use this url to create websocket to the server that delivered current webpage */ + let ws_url = choose_ws_url("" /*url_suffix*/); + console.log("ws_url: [", ws_url, "]"); + + srv_ws = new WebSocket(ws_url, "lws-minimal"); + + try { + // srv_ws.onopen = () => { ... }; + // srv_ws.onclose = () => { ... }; + srv_ws.onmessage = (msg) => { + /* msg has dozens of attributes, too many to list here + * actual application message appears in the .data attribute + * (as nested js string) + */ + //console.log("incoming ws msg: [", msg, "]"); + + let msgdata = JSON5.parse(msg.data); + + //console.log("msgdata: [", msgdata, "]"); + + let stream_name = msgdata.stream; + let event = msgdata.event; + + if (stream_name == "/ws/uls") { + if (uls_ctl) { + uls_ctl.on_update(event); + } + } else if (stream_name == "/ws/kfs") { + if (kfs_ctl) { + kfs_ctl.on_update(event); + } + } else { + console.log("unknown stream name [", stream_name, "]"); + } + }; + } catch(excetpion) { + asert("

Error: " + exception); + } + + console.log("srv_ws state [", srv_ws.readyState, "]"); + + srv_ws.addEventListener('open', + (event) => { + console.log("srv_ws state [", srv_ws.readyState, "]"); + if (uls_ctl) { + srv_ws.send("{\"cmd\": \"subscribe\", \"stream\": \"/ws/uls\"} "); + } + if (kfs_ctl) { + srv_ws.send("{\"cmd\": \"subscribe\", \"stream\": \"/ws/kfs\"} "); + } + //socket.send('Hello Server!'); + }); + +} /*content_loaded_fn*/ + +document.addEventListener("DOMContentLoaded", + content_loaded_fn, + false); + +/* end ex_websock.js */ diff --git a/utest/mount-origin/example.js b/utest/mount-origin/example.js new file mode 100644 index 00000000..4df6c234 --- /dev/null +++ b/utest/mount-origin/example.js @@ -0,0 +1,64 @@ +function get_appropriate_ws_url(extra_url) +{ + var pcol; + var u = document.URL; + + /* + * We open the websocket encrypted if this page came on an + * https:// url itself, otherwise unencrypted + */ + + if (u.substring(0, 5) === "https") { + pcol = "wss://"; + u = u.substr(8); + } else { + pcol = "ws://"; + if (u.substring(0, 4) === "http") + u = u.substr(7); + } + + u = u.split("/"); + + /* + "/xxx" bit is for IE10 workaround */ + + return pcol + u[0] + "/" + extra_url; +} + +function new_ws(urlpath, protocol) +{ + return new WebSocket(urlpath, protocol); +} + +document.addEventListener("DOMContentLoaded", function() { + + var ws = new_ws(get_appropriate_ws_url(""), "lws-minimal"); + try { + ws.onopen = function() { + document.getElementById("m").disabled = 0; + document.getElementById("b").disabled = 0; + }; + + ws.onmessage =function got_packet(msg) { + document.getElementById("r").value = + document.getElementById("r").value + msg.data + "\n"; + document.getElementById("r").scrollTop = + document.getElementById("r").scrollHeight; + }; + + ws.onclose = function(){ + document.getElementById("m").disabled = 1; + document.getElementById("b").disabled = 1; + }; + } catch(exception) { + alert("

Error " + exception); + } + + function sendmsg() + { + ws.send(document.getElementById("m").value); + document.getElementById("m").value = ""; + } + + document.getElementById("b").addEventListener("click", sendmsg); + +}, false); diff --git a/utest/mount-origin/index.html b/utest/mount-origin/index.html new file mode 100644 index 00000000..49738af1 --- /dev/null +++ b/utest/mount-origin/index.html @@ -0,0 +1,19 @@ + + + + + + + +
+ + LWS chat minimal ws server example.
+ Chat is sent to all browsers open on this page. +
+
+
+ + + + + diff --git a/utest/mount-origin/libwebsockets.org-logo.svg b/utest/mount-origin/libwebsockets.org-logo.svg new file mode 100644 index 00000000..ef241b37 --- /dev/null +++ b/utest/mount-origin/libwebsockets.org-logo.svg @@ -0,0 +1,66 @@ + + + + + +image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utest/mount-origin/script-csp.svg b/utest/mount-origin/script-csp.svg new file mode 100644 index 00000000..cd128f1d --- /dev/null +++ b/utest/mount-origin/script-csp.svg @@ -0,0 +1,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utest/websock_utest_main.cpp b/utest/websock_utest_main.cpp new file mode 100644 index 00000000..fd2fd9aa --- /dev/null +++ b/utest/websock_utest_main.cpp @@ -0,0 +1,282 @@ +/* @file websock_utest_main.cpp */ + +#include "websock/Webserver.hpp" +#include "volfit/init_volfit.hpp" +#include "volfit/Volfit.hpp" +#include "volfit/VolfitInputCapture.hpp" +#include "filter/init_filter.hpp" +#include "filter/KalmanFilterSvc.hpp" +#include "option/StrikeSetOmd.hpp" +#include "option/StrikeSetMarketModel.hpp" +#include "option/UlMarketModel.hpp" +#include "option/PricingContext.hpp" +#include "option/OptionStrikeSet.hpp" +#include "process/init_process.hpp" +#include "process/RealizationSource.hpp" +#include "process/RealizationTracer.hpp" +#include "process/UpxEvent.hpp" +#include "process/ExpProcess.hpp" +#include "process/BrownianMotion.hpp" +#include "simulator/init_simulator.hpp" +#include "simulator/Simulator.hpp" +#include "reactor/EventStore.hpp" +#include "randomgen/random_seed.hpp" +#include "randomgen/xoshiro256.hpp" +#include "time/Time.hpp" +#include "printjson/PrintJson.hpp" +#include "indentlog/print/tag.hpp" +#include + +/* webserver instance */ +static xo::ref::rp g_ws; + +void sigint_handler(int /*sig*/) { + std::cerr << "main thread interrupt_handler\n"; + + if (g_ws) + g_ws->interrupt_stop_webserver(); +} + +int +main(int argc, char **argv) { + using xo::web::Webserver; + using xo::web::WebserverConfig; + using xo::json::PrintJsonSingleton; + using xo::vf::Volfit; + using xo::vf::VolfitInputCaptureSvc; + using xo::kalman::KalmanFilterStateExt; + using xo::kalman::KalmanFilterSvc; + using xo::kalman::KalmanFilterSpec; + using xo::option::FlatVolsfc; + using xo::option::BboTick; + using xo::option::StrikeSetOmd; + using xo::option::StrikeSetMarketModel; + using xo::option::UlMarketModel; + using xo::option::PricingContext; + using xo::option::OptionStrikeSet; + using xo::option::Secid; + using xo::option::Pxtick; + using xo::process::UpxEvent; + using xo::process::RealizationSource; + using xo::process::RealizationTracer; + using xo::process::StochasticProcess; + using xo::process::ExpProcess; + using xo::process::BrownianMotion; + using xo::sim::Simulator; + using xo::reactor::PtrEventStore; + using xo::reactor::StructEventStore; + using xo::reactor::ReactorSource; + using xo::rng::Seed; + using xo::rng::xoshiro256ss; + using xo::ref::rp; + using xo::time::utc_nanos; + using xo::time::timeutil; + using xo::json::PrintJsonSingleton; + using xo::json::PrintJson; + using xo::Subsystem; + using xo::scope; + using xo::xtag; + + try { +#ifdef NOT_USING + InitSubsys::require(); + InitSubsys::require(); + InitSubsys::require(); +#endif + + XO_SUBSYSTEM_REQUIRE(volfit); + XO_SUBSYSTEM_REQUIRE(filter); + XO_SUBSYSTEM_REQUIRE(simulator); + XO_SUBSYSTEM_REQUIRE(process); + + Subsystem::initialize_all(); + + signal(SIGINT, sigint_handler); + + scope log(XO_ENTER0(always)); + + rp pjson = PrintJsonSingleton::instance(); + + /* RC Sep 2022 - adding c++ translation of kalman/src/pywebsock/ex_websock.py; + * intending to debug server segfault without complication of running + * from python interpreter + */ + + Secid secid0(0, 0); + Secid ul0 = Secid::ul(0); + + utc_nanos t0 = timeutil::ymd_hms_usec(20220926 /*ymd*/, + 93000 /*hms*/, + 0 /*usec*/); + utc_nanos t1 = t0 + std::chrono::hours(30 * 24); + + /* sim = pysimulator.Simulator.make() */ + rp sim = Simulator::make(t0); + + /* ss = pyoption.make_option_strike_set(3, secid0, 10, 1, t1, Pxtick.penny_nickel) */ + rp ss + = OptionStrikeSet::regular(3 /*n*/, + secid0 /*start_id*/, + 10.0 /*lo_strike*/, + 1.0 /*d_strike*/, + t1 /*expiry*/, + Pxtick::penny_nickel); + + /* cx = pyoption.make_pricing_context(t0, 11.11, .5, .06) */ + rp cx + = PricingContext::make(t0, 11.11 /*ref_spot*/, + FlatVolsfc::make(0.5) /*volatility*/, + 0.06 /*rate*/); + + /* TODO: replace with constant to get deterministic behavior */ + Seed seed; + + /* ebm = pyprocess.make_exponential_brownian_motion(t0, 11.0, 0.5) */ + rp ebm + = ExpProcess::make(11.0, + BrownianMotion::make(t0, + 0.5 /*volatility*/, + seed)); + + /* src = pyprocess.make_realization_source(ebm, dt.timedelta(seconds=1)) */ + rp src + = RealizationSource::make(RealizationTracer::make(ebm), + std::chrono::seconds(1)); + src->set_name("src"); + + /* (A) + * ulm = pyoption.make_ul_market_model(ul0, cx) + */ + rp ulm = UlMarketModel::make(ul0, cx); + ulm->set_name("ulm"); + + /* (B) + * ssm = pyoption.make_strikeset_market_model(ss, cx) + */ + rp ssm = StrikeSetMarketModel::make(ss, cx); + ssm->set_name("ssm"); + + /* (C) + * ssmd = pyoption.make_strikeset_omd(ss) + */ + rp ssmd = StrikeSetOmd::make(ss); + ssmd->set_name("ssmd"); + + /* (D) + * bbos = pyoption.BboTickStore.make() + */ + using BboTickStore = StructEventStore; + rp bbos = StructEventStore::make(); + bbos->set_name("bbos"); + + /* (E) + * vfin = pyvolfit.make_volfit_input_capture(cx) + */ + rp vfin = VolfitInputCaptureSvc::make(cx); + vfin->set_name("vfin"); + + /* (F) + * kfspec = pyvolfit.make_kf_spec_m1(t0, s0=0.3, p0=1.0, q=5) + */ + KalmanFilterSpec kfspec = Volfit::kf_spec_m1(t0, 0.3, 1.0, 5.0); + /* kf = pyfilter.make_kalman_filter(spec=kfspec) */ + rp kf = KalmanFilterSvc::make(kfspec); + kf->set_debug_sim_flag(true); + kf->set_name("kf"); + + using KalmanFilterStateEventStore + = PtrEventStore>; + + /* (G) + * kfs = pyfilter.KalmanFilterStateEventStore.make() + */ + rp kfs + = KalmanFilterStateEventStore::make(); + + /* sim.add_source(src) */ + sim->add_source(src); + + /* uls = pyprocess.UpxEventStore.make() */ + using UpxEventStore = StructEventStore; + rp uls = UpxEventStore::make(); + src->attach_sink(uls); + + /* (A) */ + sim->add_source(ulm); + src->attach_sink(ulm); + + /* (B) */ + sim->add_source(ssm); + src->attach_sink(ssm); + + /* (C) */ + sim->add_source(ssmd); + ulm->attach_sink(ssmd); + ssm->attach_sink(ssmd); + + /* (D) */ + ulm->attach_sink(bbos); + ssm->attach_sink(bbos); + + /* (E) */ + // sim->add_source(vfin) ? + ssmd->attach_sink(vfin); + + /* (F) */ + // sim->add_source(kf) ? + vfin->attach_sink(kf); + + /* (G) */ + kf->attach_sink(kfs); + + /* wconfig=pywebsock.WebserverConfig(7682, False, False, False) + * web=pywebsock.Webserver.make(wconfig) + */ + g_ws = Webserver::make(WebserverConfig(7682 /*port*/, + false /*!tls_flag*/, + false /*!strict_host_check_flag*/, + false /*!use_retry_flag*/), + PrintJsonSingleton::instance()); + + /* web.register_http_endpoint(uls.http_endpoint_descr("/uls")) # /dyn/uls/snap + * web.register_http_endpoint(kfs.http_endpoint_descr("/kfs")) # /dyn/kfs/snap + */ + g_ws->register_http_endpoint(uls->http_endpoint_descr(pjson, "/uls")); + g_ws->register_http_endpoint(kfs->http_endpoint_descr(pjson, "/kfs")); + + /* web.register_stream_endpoint(src.stream_endpoint_descr("/ws/uls")) + * web.register_stream_endpoint(kf.stream_endpoint_descr("/ws/kfs")) + */ + g_ws->register_stream_endpoint(src->stream_endpoint_descr("/ws/uls")); + g_ws->register_stream_endpoint(kf->stream_endpoint_descr("/ws/kfs")); + + log("starting webserver.."); + + g_ws->start_webserver(); + + log("..webserver started"); + + /* attempt simulation. + * throttle so that we have time to connect browser, give url + * http://localhost:7682/ex_websock.html + * pywebsock/ex_websock.py implements a governed simulation loop + * in python, so that it's interruptible. + * here, we can rely on simulator instead + */ + sim->run_throttled_until(t0 /*t1 - ignored if <= sim.t0()*/, + 50 /*n_max*/, + 2.5 /*replay_factor*/); + + + log("joining webserver.."); + + g_ws->join_webserver(); + + log("..joined webserver"); + } catch (std::exception & ex) { + std::cerr << "caught exception" << xtag("ex", ex.what()) << std::endl; + } + +} /*main*/ + +/* end websock_utest_main.cpp */