implementation + compile as independent module

This commit is contained in:
Roland Conybeare 2023-10-17 16:07:33 -04:00
commit c634f33e67
26 changed files with 4297 additions and 0 deletions

View file

@ -0,0 +1,16 @@
# xo-websock/CMakeLists.txt
set(SELF_LIB websock)
set(SELF_SRCS EndpointUtil.cpp DynamicEndpoint.cpp WebsockUtil.cpp WebsocketSink.cpp Webserver.cpp)
xo_add_shared_library3(${SELF_LIB} ${PROJECT_NAME}Targets ${PROJECT_VERSION} 1 ${SELF_SRCS})
# ----------------------------------------------------------------
# external dependencies
xo_dependency(${SELF_LIB} reactor)
xo_dependency(${SELF_LIB} webutil)
# note: changes to xo_dependency() calls here
# must coordinate with find_dependency() calls in
# xo-websock/cmake/websockConfig.cmake.in

View file

@ -0,0 +1,146 @@
/* file DynamicEndpoint.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "DynamicEndpoint.hpp"
namespace xo {
using xo::web::Alist;
using xo::fn::CallbackId;
using xo::ref::rp;
namespace web {
DynamicEndpoint::DynamicEndpoint(std::string uri_pattern,
HttpEndpointFn http_fn,
StreamSubscribeFn subscribe_fn,
StreamUnsubscribeFn unsubscribe_fn)
: uri_pattern_{std::move(uri_pattern)},
http_fn_{std::move(http_fn)},
subscribe_fn_{std::move(subscribe_fn)},
unsubscribe_fn_{std::move(unsubscribe_fn)}
{
std::string r_pat;
/* 1st pass -- construct pattern regex .uri_regex
* to identify urls that belong to this endpoint
*
* using regex like:
* \$\{[[:alnum:]]+\}
*/
{
std::regex var_rgx("\\$\\{[[:alnum:]]+\\}");
/* e.g. if .uri_pattern:
* /fixed/stem/${a}/more/fixed/stuff/${b}
* then want r_pat:
* /fixed/stem/[[:alnum:]]+/more/fixed/stuff/[[:alnum:]]+
* to find values pattern variables like ${a}, ${b}
*/
std::regex_replace(std::back_inserter(r_pat),
this->uri_pattern_.begin(),
this->uri_pattern_.end(),
var_rgx,
std::string("([[:alnum:]]+)"));
this->uri_regex_ = std::regex(r_pat);
}
/* 2nd pass -- identify pattern variables */
{
/* regex for:
* \$\{([[:alnum:]]+)\}
* use to match input like
* ${apple}
* and also extract the variable name
* apple
*/
std::regex var_rgx("\\$\\{([[:alnum:]]+)\\}");
std::smatch match;
std::string subject = this->uri_pattern_;
/* if subject like
* /fixed/stem/${a}/more/fixed/stuff/${b}
* extract
* ["a", "b"]
*
* for
* /fixed/stem/${a}/more/fixed/stuff/${b}/${a}
* also extract
* ["a", "b"]
* i.e. avoid extracting the same variable name twice
*/
while (std::regex_search(subject, match, var_rgx)) {
std::string v = match[1];
bool present_flag = false;
for (auto const & x : this->var_v_) {
if (x == v) {
present_flag = true;
break;
}
}
if (!present_flag)
this->var_v_.push_back(match[1]);
subject = match.suffix().str();
}
}
} /*ctor*/
void
DynamicEndpoint::http_response(std::string const & incoming_uri,
std::ostream * p_os) const
{
/* send this uri argument list callback.
* contains variables extracted from .uri_pattern
* (variables surrounded by ${...})
*/
Alist alist;
/* extract pattern variables in uri
* c.f. 2nd pass in DynamicEndpoint.ctor
*/
std::smatch match;
std::string subject = incoming_uri;
/* if subject like
* /fixed/stem/apple/more/fixed/stuff/beagle
* with .uri_pattern
* /fixed/stem/${a}/more/fixed/stuff/${b}
* then we have .uri_regex
* /fixed/stem/([[:alnum:]]+)/more/fixed/stuff/([[:alnum:]]+)
* use this to extract values for keys in .var_v,
* in the same order
*/
if (std::regex_match(subject, match, this->uri_regex_)) {
for (size_t i = 0, n = this->var_v_.size(); i<n; ++i) {
std::string i_name = this->var_v_[i];
std::string i_value = match[1+i];
alist.push_back(i_name, i_value);
}
}
this->http_fn_(incoming_uri, alist, p_os);
} /*http_response*/
CallbackId
DynamicEndpoint::subscribe(std::string const & /*incoming_uri*/,
rp<AbstractSink> const & ws_sink) const
{
return this->subscribe_fn_(ws_sink);
} /*subscribe*/
void
DynamicEndpoint::unsubscribe(CallbackId id) const
{
return this->unsubscribe_fn_(id);
} /*unsubscribe*/
} /*namespace web*/
} /*namespace xo*/
/* end DynamicEndpoint.cpp */

View file

@ -0,0 +1,38 @@
/* file EndpointUtil.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "EndpointUtil.hpp"
namespace xo {
namespace web {
std::string
EndpointUtil::stem(std::string const & pattern)
{
std::size_t p = 0;
do {
p = pattern.find_first_of("$", p);
if ((p != std::string::npos) && (pattern[p+1] == '{')) {
/* fixed stem is chars [0 .. p-1], i.e. 1st p characters */
break;
}
if (p != std::string::npos) {
/* skip to next '$' */
++p;
}
} while (p != std::string::npos);
if (p == std::string::npos) {
/* pattern has no variable components */
return pattern;
} else {
return pattern.substr(0, p);
}
} /*stem*/
} /*namespace web*/
} /*namespace xo*/
/* end EndpointUtil.cpp */

1939
src/websock/Webserver.cpp Normal file

File diff suppressed because it is too large Load diff

141
src/websock/WebsockUtil.cpp Normal file
View file

@ -0,0 +1,141 @@
/* @file WebsockUtil.cpp */
#include "WebsockUtil.hpp"
#define STRINGIFY(x) #x
namespace xo {
namespace web {
char const *
WebsockUtil::ws_callback_reason_descr(lws_callback_reasons x) {
#define CASE(x) case x: return STRINGIFY(x)
switch (x) {
CASE(LWS_CALLBACK_PROTOCOL_INIT);
CASE(LWS_CALLBACK_PROTOCOL_DESTROY);
CASE(LWS_CALLBACK_WSI_CREATE);
CASE(LWS_CALLBACK_WSI_DESTROY);
CASE(LWS_CALLBACK_WSI_TX_CREDIT_GET);
CASE(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS);
CASE(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS);
CASE(LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION);
CASE(LWS_CALLBACK_OPENSSL_CONTEXT_REQUIRES_PRIVATE_KEY);
CASE(LWS_CALLBACK_SSL_INFO);
CASE(LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION);
CASE(LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED);
CASE(LWS_CALLBACK_HTTP);
CASE(LWS_CALLBACK_HTTP_BODY);
CASE(LWS_CALLBACK_HTTP_BODY_COMPLETION);
CASE(LWS_CALLBACK_HTTP_FILE_COMPLETION);
CASE(LWS_CALLBACK_HTTP_WRITEABLE);
CASE(LWS_CALLBACK_CLOSED_HTTP);
CASE(LWS_CALLBACK_FILTER_HTTP_CONNECTION);
CASE(LWS_CALLBACK_ADD_HEADERS);
CASE(LWS_CALLBACK_VERIFY_BASIC_AUTHORIZATION);
CASE(LWS_CALLBACK_CHECK_ACCESS_RIGHTS);
CASE(LWS_CALLBACK_PROCESS_HTML);
CASE(LWS_CALLBACK_HTTP_BIND_PROTOCOL);
CASE(LWS_CALLBACK_HTTP_DROP_PROTOCOL);
CASE(LWS_CALLBACK_HTTP_CONFIRM_UPGRADE);
CASE(LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP);
CASE(LWS_CALLBACK_CLOSED_CLIENT_HTTP);
CASE(LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ);
CASE(LWS_CALLBACK_RECEIVE_CLIENT_HTTP);
CASE(LWS_CALLBACK_COMPLETED_CLIENT_HTTP);
CASE(LWS_CALLBACK_CLIENT_HTTP_WRITEABLE);
CASE(LWS_CALLBACK_CLIENT_HTTP_REDIRECT);
CASE(LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL);
CASE(LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL);
CASE(LWS_CALLBACK_ESTABLISHED);
CASE(LWS_CALLBACK_CLOSED);
CASE(LWS_CALLBACK_SERVER_WRITEABLE);
CASE(LWS_CALLBACK_RECEIVE);
CASE(LWS_CALLBACK_RECEIVE_PONG);
CASE(LWS_CALLBACK_WS_PEER_INITIATED_CLOSE);
CASE(LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION);
CASE(LWS_CALLBACK_CONFIRM_EXTENSION_OKAY);
CASE(LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL);
CASE(LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL);
CASE(LWS_CALLBACK_CLIENT_CONNECTION_ERROR);
CASE(LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH);
CASE(LWS_CALLBACK_CLIENT_ESTABLISHED);
CASE(LWS_CALLBACK_CLIENT_CLOSED);
CASE(LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER);
CASE(LWS_CALLBACK_CLIENT_RECEIVE);
CASE(LWS_CALLBACK_CLIENT_RECEIVE_PONG);
CASE(LWS_CALLBACK_CLIENT_WRITEABLE);
CASE(LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED);
CASE(LWS_CALLBACK_WS_EXT_DEFAULTS);
CASE(LWS_CALLBACK_FILTER_NETWORK_CONNECTION);
CASE(LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL);
CASE(LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL);
CASE(LWS_CALLBACK_GET_THREAD_ID);
CASE(LWS_CALLBACK_ADD_POLL_FD);
CASE(LWS_CALLBACK_DEL_POLL_FD);
CASE(LWS_CALLBACK_CHANGE_MODE_POLL_FD);
CASE(LWS_CALLBACK_LOCK_POLL);
CASE(LWS_CALLBACK_UNLOCK_POLL);
CASE(LWS_CALLBACK_CGI);
CASE(LWS_CALLBACK_CGI_TERMINATED);
CASE(LWS_CALLBACK_CGI_STDIN_DATA);
CASE(LWS_CALLBACK_CGI_STDIN_COMPLETED);
CASE(LWS_CALLBACK_CGI_PROCESS_ATTACH);
CASE(LWS_CALLBACK_SESSION_INFO);
CASE(LWS_CALLBACK_GS_EVENT);
CASE(LWS_CALLBACK_HTTP_PMO);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_RX);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_RX);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_CLOSE);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_CLOSE);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_ADOPT);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_ADOPT);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_BIND_PROTOCOL);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_BIND_PROTOCOL);
CASE(LWS_CALLBACK_RAW_PROXY_CLI_DROP_PROTOCOL);
CASE(LWS_CALLBACK_RAW_PROXY_SRV_DROP_PROTOCOL);
CASE(LWS_CALLBACK_RAW_RX);
CASE(LWS_CALLBACK_RAW_CLOSE);
CASE(LWS_CALLBACK_RAW_WRITEABLE);
CASE(LWS_CALLBACK_RAW_ADOPT);
CASE(LWS_CALLBACK_RAW_CONNECTED);
CASE(LWS_CALLBACK_RAW_SKT_BIND_PROTOCOL);
CASE(LWS_CALLBACK_RAW_SKT_DROP_PROTOCOL);
CASE(LWS_CALLBACK_RAW_ADOPT_FILE);
CASE(LWS_CALLBACK_RAW_RX_FILE);
CASE(LWS_CALLBACK_RAW_WRITEABLE_FILE);
CASE(LWS_CALLBACK_RAW_CLOSE_FILE);
CASE(LWS_CALLBACK_RAW_FILE_BIND_PROTOCOL);
CASE(LWS_CALLBACK_RAW_FILE_DROP_PROTOCOL);
CASE(LWS_CALLBACK_TIMER);
CASE(LWS_CALLBACK_EVENT_WAIT_CANCELLED);
CASE(LWS_CALLBACK_CHILD_CLOSING);
CASE(LWS_CALLBACK_CONNECTING);
CASE(LWS_CALLBACK_VHOST_CERT_AGING);
CASE(LWS_CALLBACK_VHOST_CERT_UPDATE);
CASE(LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED);
CASE(LWS_CALLBACK_MQTT_IDLE);
CASE(LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED);
CASE(LWS_CALLBACK_MQTT_SUBSCRIBED);
CASE(LWS_CALLBACK_MQTT_CLIENT_WRITEABLE);
CASE(LWS_CALLBACK_MQTT_CLIENT_RX);
CASE(LWS_CALLBACK_MQTT_UNSUBSCRIBED);
CASE(LWS_CALLBACK_MQTT_DROP_PROTOCOL);
CASE(LWS_CALLBACK_MQTT_CLIENT_CLOSED);
CASE(LWS_CALLBACK_MQTT_ACK);
CASE(LWS_CALLBACK_MQTT_RESEND);
CASE(LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT);
CASE(LWS_CALLBACK_MQTT_SHADOW_TIMEOUT);
CASE(LWS_CALLBACK_USER);
}
#undef CASE
return "???";
} /*ws_callback_reason_descr*/
} /*namespace web*/
} /*namespace xo*/
/* end WebsockUtil.cpp */

View file

@ -0,0 +1,148 @@
/* file WebsocketSink.cpp
*
* author: Roland Conybeare, Sep 2022
*/
#include "WebsocketSink.hpp"
#include "Webserver.hpp"
#include "xo/printjson/PrintJson.hpp"
#include "xo/reflect/Reflect.hpp"
#include "xo/reflect/TaggedPtr.hpp"
#include "xo/indentlog/scope.hpp"
namespace xo {
using xo::reactor::AbstractSource;
using xo::json::PrintJson;
using xo::reflect::Reflect;
using xo::reflect::TaggedPtr;
using xo::reflect::TypeDescr;
using xo::ref::rp;
using xo::ref::brw;
using xo::print::quoted;
using xo::print::qcstr;
using xo::scope;
using xo::xtag;
namespace web {
/* a sink that publishes to a websocket.
* The websocket api creates a WebsocketSink instance
* on behalf of an incoming subscription request.
* application code will hold onto the sink somewhere
* and publish events to it, to send them via websocket.
*/
class WebsocketSinkImpl : public WebsocketSink {
public:
using PrintJson = xo::json::PrintJson;
using AbstractSource = reactor::AbstractSource;
public:
WebsocketSinkImpl(ref::rp<Webserver> const & websrv,
ref::rp<PrintJson> const & pjson,
uint32_t session_id,
std::string stream_name)
: websrv_{std::move(websrv)},
pjson_{std::move(pjson)},
session_id_{session_id},
stream_name_{std::move(stream_name)}
{}
virtual std::string const & name() const override { return name_; }
virtual void set_name(std::string const & x) override { this->name_ = x; }
/* 0 consumers for websocket sink, since it's not a source */
virtual void visit_direct_consumers(std::function<void (brw<AbstractEventProcessor>)> const &) override {}
virtual void display(std::ostream & os) const override;
virtual bool allow_polymorphic_source() const override { return true; }
virtual TypeDescr sink_ev_type() const override;
virtual bool allow_volatile_source() const override { return true; }
virtual uint32_t n_in_ev() const override { return n_in_ev_; }
virtual void attach_source(ref::rp<AbstractSource> const & src) override;
virtual void notify_ev_tp(TaggedPtr const & ev_tp) override;
private:
/* (ideally unique) user-controlled name for this sink
* in practice not likely to be accessible,
* so probably want to generate a unique-y default
*/
std::string name_;
/* webserver implementation */
ref::rp<Webserver> websrv_;
/* print arbitrary reflected stuff as json */
ref::rp<PrintJson> pjson_;
/* websocket session id# - events arriving at this sink
* will be sent only to the session identified by .session_id
*/
uint32_t session_id_;
/* name for stream.
* this will be the vale of the "stream" tag in
* initiating subscription message
* {"cmd": "subscribe", "stream", "/this/stream/name"}
* e.g. in python:
* web.register_stream_endpoint(kf.stream_endpoint_descr("/this/stream/name"))
*/
std::string stream_name_;
/* count #of events received */
uint32_t n_in_ev_ = 0;
}; /*WebsocketSinkImpl*/
TypeDescr
WebsocketSinkImpl::sink_ev_type() const
{
return Reflect::require<void>();
} /*sink_ev_type*/
void
WebsocketSinkImpl::attach_source(rp<AbstractSource> const & src) {
src->attach_sink(this);
} /*attach_source*/
void
WebsocketSinkImpl::notify_ev_tp(TaggedPtr const & ev_tp)
{
scope log(XO_DEBUG(true /*debug_flag*/));
std::stringstream ss;
/* format message envelope */
ss << "{" << qcstr("stream") << ": " << quoted(this->stream_name_)
<< ", " << qcstr("event") << ": ";
/* format event as json */
this->pjson_->print_tp(ev_tp, &ss);
ss << "}";
log && log("sending", xtag("msg", ss.str()));
++(this->n_in_ev_);
/* send event via associated websocket */
this->websrv_->send_text(this->session_id_, ss.str());
} /*notify_ev_tp*/
void
WebsocketSinkImpl::display(std::ostream & os) const
{
os << "<WebsocketSinkImpl"
<< xtag("addr", (void*)this)
<< xtag("name", name_)
<< xtag("n_in_ev", n_in_ev_)
<< xtag("stream", stream_name_)
<< ">";
} /*display*/
// ----- WebsocketSink -----
rp<WebsocketSink>
WebsocketSink::make(rp<Webserver> const & websrv,
rp<PrintJson> const & pjson,
uint32_t session_id,
std::string const & stream_name)
{
return new WebsocketSinkImpl(websrv, pjson, session_id, stream_name);
} /*make*/
} /*namespace web*/
} /*namespace xo*/
/* end WebsocketSink.cpp */