xo-tokenizer2/src/websock/Webserver.cpp

1947 lines
76 KiB
C++

/* @file Webserver.cpp
*
* Webserver + websocket container/scaffold.
* Originally adapted from libwebsocket example
*
* 19aug2022
* This version probably overly rigid.
* Initial goal is to adpat example application so that:
* a. it works as a library
* b. doesn't hijack main thread
*
* Subsequently, want to wrap to demonstrate library working from within python
*
* In the meantime, adopting example code as-is without looking closely
* at globals/singletons
*
*/
#include "Webserver.hpp"
#include "WebsocketSink.hpp"
#include "WebsockUtil.hpp"
#include "WsSafetyToken.hpp"
#include "DynamicEndpoint.hpp"
#include "xo/printjson/PrintJson.hpp"
#include <json/json.h> // for Json::Reader, to parse json input
#include <condition_variable>
#include <unordered_map>
#include <regex>
#include <deque>
#include <vector>
namespace xo {
using xo::web::Alist;
using xo::reactor::AbstractSink;
using xo::json::PrintJson;
using xo::fn::CallbackId;
using xo::ref::rp;
using xo::scope;
using xo::xtag;
namespace web {
char const *
RunstateUtil::runstate_descr(Runstate x)
{
# define CASE(x) case Runstate::x: return #x
switch(x) {
CASE(stopped);
CASE(stop_requested);
CASE(running);
}
# undef CASE
return "???";
} /*runstate_descr*/
/* both websocket and appl thread can obtain this token.
* see WebsocketSessionRecd. Posession of this token is evidence
* caller holds WebsocketSessionRecd.mutex
*/
class WsSessionSafetyToken : public SafetyToken<class WsSessionSafetyToken_tag> {
private:
friend class WebsocketSessionRecd;
private:
/* only WebsocketSessionRecd should construct this
* mutex argument present just to alert reader
*/
WsSessionSafetyToken(std::unique_lock<std::mutex> const &) {}
}; /*WsSessionSafetyToken*/
namespace {
/* one of these is created for each client connecting to us */
struct OutputBuffer;
/* editor bait:
* WebserverImpl::send_text()
* ws_pss
*
* NOTE:
* 1. per_session_data__http instances are created by libwebsocket library.
* since that's implemented in C, ctor/dtors won't be invoked for this class
*/
struct per_session_data__minimal {
public:
/* output state; allocated as bona fide c++ object */
OutputBuffer * output_buf_;
}; /*per_session_data__minimal*/
/* one of these created for each message */
/* output destined for a particular websocket.
* 'struct msg' that folllows is a POD C struct inherited from
* libwebsocket example code; intend to retire that.
*
* An OutputMsg instance sends bytes [lo..hi),
* using as many trips as necessary
*
* +---...---+---...--------+
* | LWS_PRE | text payload |
* +---...---+---...--------+
* ^ ^ ^
* .buf .text() text() + text.size()
*
* |<---A--->|<------B------->|
* |<------------C----------->|
*
* A: (LWS_PRE bytes) populated by ::lws_write(), in particular encodes .buf_z
* B: (.text_z bytes) WebserverImpl passes this range to ::lws_write()
* C: (LWS_PRE + .text_z bytes) ::lws_write() actually sends this range
*
* Note trailing null isn't required, since length is explicitly sent
*/
struct OutputBuffer {
public:
OutputBuffer(uint32_t session_id) : session_id_{session_id} {}
~OutputBuffer() = default;
uint32_t session_id() const { return session_id_; }
struct lws * wsi() const { return wsi_; }
/* non-const access required.
* lws_write() will prepend headers in .buf_v[0..LWS_PRE-1]
*/
unsigned char * text() { return &(buf_v_[LWS_PRE]); }
unsigned char const * text() const { return &(buf_v_[LWS_PRE]); }
size_t text_z() const { return text_z_; }
std::string_view text_view() const {
return std::string_view((char const *)(this->text()),
this->text_z());
}
bool is_busy() const { return this->sent_seq_ < this->stored_seq_; }
bool is_idle() const { return this->sent_seq_ == this->stored_seq_; }
void establish_wsi(struct lws * wsi) { this->wsi_ = wsi; }
bool is_writeable(WsSafetyToken const &) const { return is_writeable_; }
void set_is_writeable(bool x, WsSafetyToken const &) { is_writeable_ = x; }
/* caller must hold WebsocketSessionRecd.mutex;
* evidenced by wsession_token
*/
void store_message(uint32_t msg_seq,
std::string const & text,
WsSessionSafetyToken const & wsession_token) {
scope log(XO_ENTER0(info));
wsession_token.verify();
if (sent_seq_ != stored_seq_) {
log && log("store_message: attempt storing new msg_seq but sent_seq!=stored_seq",
xtag("sent_seq", sent_seq_),
xtag("stored_seq", stored_seq_),
xtag("msg_seq", msg_seq));
assert(false);
}
size_t req_z = LWS_PRE + text.size();
if (this->buf_v_.size() < req_z)
this->buf_v_.resize(req_z);
this->text_z_ = text.size();
::memcpy(&(this->buf_v_[LWS_PRE]), text.c_str(), this->text_z_);
log && log(xtag("buf", (void*)&(this->buf_v_[0])),
xtag("msg_seq", msg_seq),
xtag("text", text),
xtag("text.size", text.size()),
xtag("req_z", req_z));
this->stored_seq_ = msg_seq;
} /*store_message*/
int lws_write_aux(WsSafetyToken const & ws_safety_token)
{
scope log(XO_ENTER0(info));
ws_safety_token.verify();
this->set_is_writeable(false, ws_safety_token);
log && log("write to websocket",
xtag("wsi", (void*)wsi_),
xtag("text_z", this->text_z()));
log && log(xtag("text", this->text_view()));
/* 1. notice we allowed for LWS_PRE in the payload already;
* this is mandatory for LWS_WRITE_TEXT.
* 2. LWS_WRITE_TEXT requires valid utf-8 payload
* 3. lws_write() writes entire contents, using
* multiple network writes if necessary.
* Application side can ignore the possibility of partial writes.
*/
int m = ::lws_write(this->wsi_,
this->text(),
this->text_z(),
LWS_WRITE_TEXT);
if (m < (int)this->text_z()) {
/* note: first time we observed this, browser console
* showed that entire message was eventually received,
* (though not if we exit() before returning)
*/
lwsl_user("lws_write_aux: PARTIAL WRITE: session=[%u], m=lws_write(z) with m<z m=[%d] z=[%lu]\n",
this->session_id_,
m,
this->text_z());
/* 23sep2022: consistent with observed behavior:
* - lws will write remainder of message
* - lws will call appl via LWS_CALLBACK_SERVER_WRITEABLE
* once write has been completed
* according to docs lws buffers message -- if true,
* probably pay for message to be copied
*/
return 0;
}
this->lws_write_completion(ws_safety_token);
return m;
} /*lws_write_aux*/
/* call this after successfully sending a message */
void lws_write_completion(WsSafetyToken const & ws_safety_token) {
/* session now writeable again. either:
* - lws_write(z) successful
* - lws_write(z) incomplete, followed by lws callback
* with reason = LWS_CALLBACK_SERVER_WRITEABLE
*/
this->set_is_writeable(true, ws_safety_token);
/* message completely written */
this->sent_seq_ = this->stored_seq_;
} /*lws_write_completion*/
private:
/* identifies websocket session associated with this buffer
* established permanently in ctor
*/
uint32_t session_id_;
/* opaque pointer; owned by libwebsocket + identifies this session.
* established once (per websocket session) from LWS_CALLBACK_ESTABLISHED
*/
struct lws * wsi_ = nullptr;
/* ::lws_write() takes responsibility for writing and buffering full message;
* IIU docs that means it doesn't return until full write has completed;
* this suggests it may also make reentrant callbacks for other sessions,
* while an incomplete call to lws_write() is on the stack.
*
* set .is_writeable to false during lws_write() calls,
* so that application threads can refrain from attempting nested
* lws_write() calls for the same session.
*/
bool is_writeable_ = false;
/* seq# of last message sent using this buffer; .sent_seq chases .stored_seq */
uint32_t sent_seq_ = 0;
/* seq# of last message stored using this buffer */
uint32_t stored_seq_ = 0;
/* buffer for outbound text.
* using the first LWS_PRE + .text_z bytes.
* 1st LWS_PRE bytes owned by lws library, must not touch these
*/
std::vector<unsigned char> buf_v_;
size_t text_z_ = 0;
}; /*OutputBuffer*/
/*
* Unlike ws, http is a stateless protocol. This pss only exists for the
* duration of a single http transaction. With http/1.1 keep-alive and
* http/2, that is unrelated to (shorter than) the lifetime of the network
* connection.
*
* NOTE
* 1. per_session_data__http instances are created by libwebsocket library.
* since that's implemented in C, we need to arrange for manual initialization
* 2. since libwebsocket implemented in C, we don't expect auto-initialization
* or constructors to be invoked.
* 3. libwebsocket gives us several alternatives for organizing resource
* allocation. We use these callback reasons:
* LWS_CALLBACK_HTTP_BIND_PROTOCOL for setup, allocate .output_ss
* LWS_CALLBACK_HTTP_DROP_PROTOCOL for teardown free .output_ss
*/
struct per_session_data__http {
int test;
/* store http reply in .output_str */
std::string * output_str;
};
/* one of these is created for each vhost our protocol is used with
*
* NOTE
* 1. per_vhost_data__minimal instances are created by libwebsocket library.
* since that's implemented in C, ctors/dtors aren't used here
*
* editor bait: vhd
*/
struct per_vhost_data__minimal {
struct lws_context * context;
struct lws_vhost * vhost;
const struct lws_protocols * protocol;
struct per_session_data__minimal * pss_list; /* linked-list of live pss*/
uint32_t next_session_id_;
//struct msg amsg; /* the one pending message... */
//int current; /* the current message number we are caching */
}; /*per_vhost_data__minimal*/
} /*namespace*/
/* bookkeeping record for a websocket subscription. */
class WebsocketSubscriptionRecd {
public:
WebsocketSubscriptionRecd(std::string const & incoming_uri,
DynamicEndpoint * endpoint,
rp<AbstractSink> const & ws_sink)
: incoming_uri_{incoming_uri},
endpoint_{endpoint},
ws_sink_{ws_sink}
{}
void subscribe() {
this->callback_id_ = this->endpoint_->subscribe(this->incoming_uri_,
this->ws_sink_);
} /*subscribe*/
void unsubscribe() {
this->endpoint_->unsubscribe(this->callback_id_);
} /*unsubscribe*/
private:
/* original subscription url */
std::string incoming_uri_;
/* endpoint that matched .subscribe_cmd
* (see WebserverImpl.stream_map)
*/
DynamicEndpoint * endpoint_ = nullptr;
/* id created when subscription established
* (see CallbackSetImpl.add_callback())
*/
CallbackId callback_id_;
/* sink established to receive (& forward) events on behalf
* of this subscription. application code writes to this sink.
*/
rp<AbstractSink> ws_sink_;
}; /*WebsocketSubscriptionRecd*/
/* bookkeeping record for a websocket session.
* WebserverImpl (below) keeps exactly one of these
* for each active websocket session
*/
class WebsocketSessionRecd {
public:
WebsocketSessionRecd(OutputBuffer * output_buf) : output_buf_{output_buf} {
assert(this->output_buf_);
}
bool is_output_busy() const {
return (this->output_buf_
&& this->output_buf_->is_busy());
}
bool outbound_q_empty() const { return this->outbound_q_.empty(); }
void subscribe_endpoint(std::string const & incoming_cmd,
DynamicEndpoint * endpoint,
rp<AbstractSink> const & ws_sink) {
scope log(XO_ENTER0(info),
xtag("incoming_cmd", incoming_cmd));
std::unique_ptr<WebsocketSubscriptionRecd> sub_recd_uptr
(new WebsocketSubscriptionRecd(incoming_cmd,
endpoint,
ws_sink));
WebsocketSubscriptionRecd * sub_recd_addr = sub_recd_uptr.get();
{
std::lock_guard<std::mutex> lock(this->mutex_);
this->active_subscription_v_.push_back(std::move(sub_recd_uptr));
}
/* note: need to call with lock dropped,
* since subscribe may in principle call WebserverImpl.send_text()
*/
if (sub_recd_addr)
sub_recd_addr->subscribe();
} /*subscribe_endpoint*/
void send_text(std::string text) {
scope log(XO_ENTER0(info));
std::unique_lock<std::mutex> lock(this->mutex_);
if (!(this->output_buf_)) {
log && log("ws_pss.output_buf not present -> exit");
} else if (this->is_output_busy()) {
log && log("ws_pss.output_msg busy, enqueue");
/* previous message already in progress, enq or drop */
this->enqueue_text(std::move(text),
WsSessionSafetyToken(lock));
/* this message will eventually get sent via
* .lws_write_pending_traffic()
*/
} else {
/* send message now! */
log && log("output_msg idle, send now");
this->prepare_outbound_message(std::move(text),
WsSessionSafetyToken(lock));
/* can release lock, won't be using for remainder of this function */
lock.unlock();
lws_context * lws_cx = ::lws_get_context(this->output_buf_->wsi());
/* interrupt libwebsocket event loop.
* will send 'wait cancelled' event to all sockets.
*
* Actually, after testing -- looks like this sends to one wsi per protocol:
* basically to the "listening" wsi, not to websocket "session" wsi
*/
::lws_cancel_service(lws_cx);
/* NOTE: web documentation seems to suggest using lws_callback_on_writable():
*
* trigger call from websocket thread to send data.
* will cause reentry via websocket thread into
* WebserverImpl::notify_minimal()
* with reason=LWS_CALLBACK_SERVER_WRITEABLE
*
* ^^^ Hmm, doesn't seem to work this way.
* Suspect this would only work if socket currently
* in non-writeable state.
*/
//lws_callback_on_writable(ws_pss->wsi);
}
} /*send_text*/
/* write some pending traffic from lws event loop
*
* Require:
* - MUST be invoked from lws event loop, for threadsafety;
* ws_safety_token provides evidence of this
*/
void lws_write_pending(WsSafetyToken const & ws_safety_token) {
scope log(XO_ENTER0(info));
log && log(xtag("output_buf", (void*)this->output_buf_));
#ifdef OBSOLETE
per_session_data__minimal * ws_pss = this->ws_pss_;
if (!ws_pss) {
lscope.log("null ws_pss, exit");
return;
}
#endif
if (!(this->output_buf_)) {
/* output message buffer not established,
* implies nothing sent yet
*/
log && log("output_msg either not established or destroyed, exit");
return;
}
/* loop until no queued messages for this session */
for (;;) {
if (!(this->output_buf_->is_writeable(ws_safety_token))) {
/* call to lws_write() already in progress */
log && log("output_buf not writeable (bc lws_write in progress)");
return;
}
if (this->output_buf_->is_idle()) {
/* already up-to-date for this session */
log && log("output idle (up-to-date)");
return;
}
this->output_buf_->lws_write_aux(ws_safety_token);
/* if there are any appl messages queued, prepare to send another one */
if (this->outbound_q_.empty()) {
/* all caught up, nothing left to send */
log && log("up-to-date after write");
} else {
std::unique_lock<std::mutex> lock(this->mutex_);
std::string text(this->dequeue_text(WsSessionSafetyToken(lock)));
this->prepare_outbound_message(std::move(text),
WsSessionSafetyToken(lock));
}
}
} /*lws_write_pending*/
/* threadsafe */
void unsubscribe_all() {
std::lock_guard<std::mutex> lock(this->mutex_);
/* also drop .output_buf,
* to short-circuit any subsequent attempts to use .lws_write_pending()
* (which will happen in response to LWS_CALLBACK_EVENT_WAIT_CANCELLED
* on any session)
*/
for (auto & sub_ptr : this->active_subscription_v_)
sub_ptr->unsubscribe();
this->output_buf_ = nullptr;
this->active_subscription_v_.clear();
} /*unsubscribe_all*/
private:
uint32_t generate_msg_seq() { return ++(this->last_msg_seq_); }
/* enqueue application-level message.
* use this when .ws_pss.outbound_buf is busy
*/
void enqueue_text(std::string text,
WsSessionSafetyToken const & /*wss_token*/) {
this->outbound_q_.push_back(std::move(text));
} /*enqueue_text*/
/* remove a deferred message from .outbound_q,
* and return it. This can happen if output becomes
* available after being write-blocked
*/
std::string dequeue_text(WsSessionSafetyToken const & /*wss_token*/) {
assert(!this->outbound_q_.empty());
std::string retval = std::move(this->outbound_q_.front());
this->outbound_q_.pop_front();
return retval;
} /*dequeue_text*/
/* prepare outbound message for sending in contiguous memory;
* in particular prepends header.
*
* this can be called from either websocket or appl thread,
* so needs to be threadsafe.
*/
void prepare_outbound_message(std::string text,
WsSessionSafetyToken const & wss_token) {
scope log(XO_ENTER0(info));
/* sequence# for this outbound message */
uint32_t msg_seq = this->generate_msg_seq();
this->output_buf_->store_message(msg_seq,
std::move(text),
wss_token);
/* now ws_pss->output_msg_->is_busy() */
log && log("staged next write",
xtag("wsi", (void*)this->output_buf_->wsi()),
xtag("text_z", this->output_buf_->text_z()));
} /*prepare_outbound_message*/
private:
/* output destined for this session
* libws (via per_session_data__minimal) also points to
* .output_buf
*/
OutputBuffer * output_buf_ = nullptr;
/* protects .active_subscription_v, .last_msg_seq, .outbound_q */
std::mutex mutex_;
/* active subscriptions established by this session */
std::vector<std::unique_ptr<WebsocketSubscriptionRecd>> active_subscription_v_;
/* generate seq#'s for outgoing messages */
uint32_t last_msg_seq_ = 0;
/* when new outgoing message appears:
* 1. if .pss->output_msg empty, allocate it and store message there;
* invoke lws_callback_on_writeable(.pss->wsi) to get message sent asap
* 2. otherwise sending a previous message is in-progress;
* put outgoing message to the back of .outbound_q
*/
std::deque<std::string> outbound_q_;
}; /*WebsocketSessionRecd*/
using EndpointMap = std::unordered_map<std::string,
std::unique_ptr<DynamicEndpoint>>;
/* defined in this translation unit, after WebserverImpl */
class WebserverImplWsThread;
class WebserverImpl : public Webserver {
public:
WebserverImpl(WebserverConfig const & ws_config,
rp<PrintJson> const & pjson)
: ws_config_{ws_config},
pjson_{pjson},
readjson_{Json::CharReaderBuilder().newCharReader()},
interrupt_flag_{false},
state_{Runstate::stopped}
{
} /*ctor*/
virtual ~WebserverImpl() {
/* if webserver is running, initiate shutdown.
* webserver thread will eventually exit
*/
this->stop_webserver();
/* wait for shutdown to complete */
this->join_webserver();
} /*dtor*/
virtual void run() = 0;
// ----- Inherited from Webserver -----
virtual Runstate state() const override { return state_; }
virtual void register_http_endpoint(HttpEndpointDescr const & endpoint) override;
virtual void register_stream_endpoint(StreamEndpointDescr const & endpoint) override;
virtual void start_webserver() override;
virtual void interrupt_stop_webserver() override;
virtual void stop_webserver() override;
virtual void join_webserver() override;
protected:
void set_lws_log_level() {
lws_set_log_level(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into
* lws, lws must have been configured and built with
* -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */,
NULL);
} /*set_lws_log_level*/
#if defined(LWS_WITH_PLUGINS)
void init_pvo(lws_protocol_vhost_options * p_pvo) {
/* {next, options, name, value} */
*p_pvo = {NULL, NULL, "lws-minimal", ""};
} /*init_pvo*/
#endif
/* called once during webserver initialization;
* identifies protocols (channels) that libws is expected to support
*/
virtual void init_protocols(std::vector<lws_protocols> * p_v) = 0;
void init_mount_dynamic(lws_http_mount * p_mount) {
/* see lws-context-vhost.h for lws_http_mount */
*p_mount = {
/* .mount_next */ NULL, /* linked-list "next" */
/* .mountpoint */ "/dyn", /* mountpoint URL */
/* .origin */ NULL, /* protocol */
/* .def */ NULL,
/* .protocol */ "http",
/* .cgienv */ NULL,
/* .extra_mimetypes */ NULL,
/* .interpret */ NULL,
/* .cgi_timeout */ 0,
/* .cache_max_age */ 0,
/* .auth_mask */ 0,
/* .cache_reusable */ 0,
/* .cache_revalidate */ 0,
/* .cache_intermediaries */ 0,
/* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */
/* .mountpoint_len */ 4, /* char count */
/* .basic_auth_login_file */ NULL,
# if ((LWS_LIBRARY_VERSION_MAJOR < 4) || ((LWS_LIBRARY_VERSION_MAJOR == 4) && (LWS_LIBRARY_VERSION_MINOR < 3)))
/* ._unused[] */ { nullptr, nullptr },
# endif
};
} /*init_mount_dynamic*/
void init_mount_static(lws_http_mount const * dynamic,
lws_http_mount * p_mount) {
/* default mount serves the URL space from ./mount-origin */
*p_mount = {
/* .mount_next */ dynamic, /* linked-list "next" */
/* .mountpoint */ "/", /* mountpoint URL */
/* .origin */ "./mount-origin", /* serve from dir */
/* .def */ "index.html", /* default filename */
/* .protocol */ NULL,
/* .cgienv */ NULL,
/* .extra_mimetypes */ NULL,
/* .interpret */ NULL,
/* .cgi_timeout */ 0,
/* .cache_max_age */ 0,
/* .auth_mask */ 0,
/* .cache_reusable */ 0,
/* .cache_revalidate */ 0,
/* .cache_intermediaries */ 0,
/* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
/* .mountpoint_len */ 1, /* char count */
/* .basic_auth_login_file */ NULL,
# if ((LWS_LIBRARY_VERSION_MAJOR < 4) || ((LWS_LIBRARY_VERSION_MAJOR == 4) && (LWS_LIBRARY_VERSION_MINOR < 3)))
/* ._unused[] */ { nullptr, nullptr },
# endif
};
} /*init_mount_static*/
void init_retry(lws_retry_bo_t * p_retry) {
p_retry->secs_since_valid_ping = 3;
p_retry->secs_since_valid_hangup = 10;
} /*init_retry*/
/* requires:
* - .pvo initialized, see .init_pvo()
* - .protocol_v[] initialized, see .init_protocols()
* - .mount_dynamic initialized, see .init_mount_dynamic()
* - .mount_static initialized, see .init_mount_static()
* - .retry initialized, see .init_retry()
*/
void init_cx_config(lws_context_creation_info * p_cx_config) {
::memset(p_cx_config, 0, sizeof(*p_cx_config));
p_cx_config->port = this->ws_config_.port();
p_cx_config->vhost_name = "localhost";
p_cx_config->pvo = &(this->pvo_);
p_cx_config->protocols = this->protocol_v_.data();
p_cx_config->mounts = &(this->mount_static_);
/* userdata -- accessible from context with lws_context_user() */
p_cx_config->user = (void*)this;
#if defined(LWS_WITH_TLS)
if (this->ws_config_.tls_flag()) {
lwsl_user("Server using TLS\n");
p_cx_config->options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
p_cx_config->ssl_cert_filepath = "localhost-100y.cert";
p_cx_config->ssl_private_key_filepath = "localhost=100y.key";
}
#endif
if (this->ws_config_.host_check_flag()) {
p_cx_config->options |= LWS_SERVER_OPTION_VHOST_UPG_STRICT_HOST_CHECK;
}
if (this->ws_config_.use_retry_flag()) {
p_cx_config->retry_and_idle_policy = &(this->retry_);
}
} /*init_cx_config*/
/* check for a DynamicEndpoint stored under stem;
* if found, invoke it on incoming_uri to respond
*
* return. true iff stem matched a dynamic endpoint;
*/
DynamicEndpoint * lookup_dynamic_http_stem(std::string const & stem);
/* write dynamic http response for incoming_uri, on *p_os
* incoming_uri will be suffix of original uri from browser,
* following dynamic mount point [/dyn].
* see .init_mount_dynamic()
*/
void dynamic_http_response(std::string const & incoming_uri,
std::ostream * p_os);
/* act on incoming websocket command
* expecting json like
* {"command": "subscribe", "stream": "uls"}
*/
void perform_ws_cmd(uint32_t session_id,
std::string_view incoming_svw);
#ifdef DEFINED_BUT_NOT_USED
/* called from libwebsocket thread when session manager
* (aka "virtual host") is created for the websocket protocol
*/
void notify_vhd(per_vhost_data__minimal * vhd);
#endif
/* called from libwebsocket thread when creating a new websocket session */
void notify_ws_session_open(OutputBuffer * output_buf,
per_vhost_data__minimal * vhd,
WsSafetyToken const & ws_safety_token);
/* called from libwebsocket thread whenever a websocket session is closed */
void notify_ws_session_close(OutputBuffer * output_buf,
per_vhost_data__minimal * vhd,
WsSafetyToken const & ws_safety_token);
/* send text to the websocket session identified by session_id */
void send_text(uint32_t session_id,
std::string text) override;
/* from lws event loop, write any pending outbound traffic
* see .pending_session_q
*/
void lws_write_pending_traffic(WsSafetyToken const & ws_safety_token);
protected:
/* callback for http protocol */
static int notify_dynamic_http(struct lws * wsi,
lws_callback_reasons reason,
void * user_data,
void * incoming_uri,
size_t len);
protected:
/* see WebserverImplWsThread below, for methods
* that are exclusive to libws thread
*/
/* initial configuration for embedded webserver */
WebserverConfig ws_config_;
/* json printer (w/ plugins for reflected types) */
rp<PrintJson> pjson_;
/* json reader */
std::unique_ptr<Json::CharReader> readjson_;
/* --- 1. LWS configuration stuff (set once) ---*/
#if defined(LWS_WITH_PLUGINS)
/* protocols listed here will "bind to vhost".
* (I don't know for sure what this means -- it's a magic spell for now)
*
*/
lws_protocol_vhost_options pvo_;
#endif
/* protocols to accept for this webserver */
std::vector<lws_protocols> protocol_v_;
/* mount point for dynamic urls
* (these will be served by executing c++ code,
* instead of serving static disk files)
*/
lws_http_mount mount_dynamic_;
/* mount point for static urls
* (serve static files from file system)
*/
lws_http_mount mount_static_;
/* retry settings
* (not sure how these are used)
*/
lws_retry_bo_t retry_;
/* configuration record for lws context
* AFAIK require lifetime >= lws_context
*/
lws_context_creation_info cx_config_;
/* runtime state owned by LWS library
* can get application-determined user data from a lws_context by
* lws_context_user(.lws_cx)
*/
lws_context * lws_cx_ = nullptr;
/* --- 2. startup/shutdown control --- */
/* set this to true to prevent further service loop iteration */
std::atomic<bool> interrupt_flag_;
/* protects .state */
std::mutex mutex_;
std::condition_variable cond_;
/* valid states
*
* .state .thread_ptr
* -----------------------------------------------
* running thread in WebserverImpl::run()
* stop_requested thread in WebserverImpl::run()
* stopped nullptr
*/
Runstate state_;
std::unique_ptr<std::thread> thread_ptr_;
/* --- 3. plugin state (writable while server runs) --- */
/* map :: stem->http_fn,
* where
* stem = "longest non-variable URI prefix"
*
* use .register_http_endpoint() to insert a new URI into this map
*
* this map used for http endpoints
*/
EndpointMap stem_map_;
/* map :: stem->subscribe_fn
* where
* stem = "longest non-variable URI prefix"
*
* use .register_stream_endpoint() to insert a new URI into this map
*
* this map used for stream endpoints
*/
EndpointMap stream_map_;
/* --- 4. libwebsocket session manager --- */
/* websocket-associated libwebsocket data.
* created by libwebsocket; our appl code informed via
* LWS_CALLBACK_PROTOCOL_INIT
*
* list of all active sessions is in .ws_vhd->pss_list
* (see LWS_CALLBACK_PROTOCOL_INIT, LWS_CALLBACK_ESTABLISHED, LWS_CALLBACK_CLOSED)
*
* can visit sessions with macros:
* lws_start_foreach_llp(struct per_session_data__minimal **, ppss, vhd->pss_list) {
* ..do stuff with (*ppss)->wsi for example..
* } lws_end_foreach_llp(ppss, pss_list);
*/
per_vhost_data__minimal * ws_vhd_ = nullptr;
/* indexed by session id# (see per_session_data__minimal.session_id)
* .session_v.size() = {max #of simultaneously-open websocket sessions}.
* may contain empty slots. if .session_v[i] is empty,
* then i appears in .free_session_id_v[], i..e .free_session_id_v[j]=i for some j
*/
std::vector<std::unique_ptr<WebsocketSessionRecd>> session_v_;
/* When a session closes, its session id becomes available.
* track such session ids here, so they can be recycled.
* want to recycle because they're indexes into .session_v[],
* and we don't want that to grow without bound
*/
std::vector<uint32_t> free_session_id_v_;
}; /*WebserverImpl*/
void
WebserverImpl::register_http_endpoint(HttpEndpointDescr const & endpoint_descr)
{
auto endpoint = DynamicEndpoint::make_http(endpoint_descr.uri_pattern(),
endpoint_descr.endpoint_fn());
this->stem_map_[endpoint->stem()] = std::move(endpoint);
} /*register_http_endpoint*/
void
WebserverImpl::register_stream_endpoint(StreamEndpointDescr const & endpoint_descr)
{
auto endpoint = DynamicEndpoint::make_stream(endpoint_descr.uri_pattern(),
endpoint_descr.subscribe_fn(),
endpoint_descr.unsubscribe_fn());
this->stream_map_[endpoint->stem()] = std::move(endpoint);
} /*register_stream_endpoint*/
#ifdef DEFINED_BUT_NOT_USED
void
WebserverImpl::notify_vhd(per_vhost_data__minimal * vhd)
{
this->ws_vhd_ = vhd;
} /*notify_vhd*/
#endif
void
WebserverImpl::notify_ws_session_open(OutputBuffer * output_buf,
per_vhost_data__minimal * vhd,
WsSafetyToken const & ws_safety_token)
{
ws_safety_token.verify();
uint32_t new_id = output_buf->session_id();
if (this->session_v_.size() <= new_id)
this->session_v_.resize(new_id + 1);
this->session_v_[new_id].reset(new WebsocketSessionRecd(output_buf));
/* control comes here when a new websocket session is created,
* after LWS_CALLBACK_HTTP_BIND_PROTOCOL
*/
output_buf->set_is_writeable(true, ws_safety_token);
/* compute next available session id + store in vhost struct */
if (this->free_session_id_v_.empty()) {
/* generate a new session id */
uint32_t id = this->session_v_.size();
vhd->next_session_id_ = id;
} else {
/* recycle a previously-used session id */
uint32_t id = this->free_session_id_v_[this->free_session_id_v_.size() - 1];
this->free_session_id_v_.pop_back();
vhd->next_session_id_ = id;
}
} /*notify_ws_session_open*/
void
WebserverImpl::notify_ws_session_close(OutputBuffer * output_buf,
per_vhost_data__minimal * /*vhd*/,
WsSafetyToken const & ws_safety_token)
{
scope log(XO_ENTER0(info));
log && log("enter",
xtag("this", (void*)this),
xtag("output_buf", (void*)output_buf));
assert(output_buf->session_id() < this->session_v_.size());
ws_safety_token.verify();
WebsocketSessionRecd * ws_session_recd
= this->session_v_[output_buf->session_id()].get();
if (ws_session_recd) {
ws_session_recd->unsubscribe_all();
}
this->free_session_id_v_.push_back(output_buf->session_id());
} /*notify_ws_session_close*/
/* note: to access lws_protocols.user,
* would use lws_get_protocol(wsi)->user
*/
int
WebserverImpl::notify_dynamic_http(struct lws * wsi,
lws_callback_reasons reason,
void * user_data,
void * incoming_data,
size_t len)
{
lws_context * lws_cx = lws_get_context(wsi);
void * cx_user_data = lws_context_user(lws_cx);
WebserverImpl * websrv = reinterpret_cast<WebserverImpl *>(cx_user_data);
struct per_session_data__http * http_pss
= reinterpret_cast<struct per_session_data__http *>(user_data);
lwsl_user("notify_dynamic_http: enter: reason %d (%s): lws_cx %p websrv %p\n",
reason,
WebsockUtil::ws_callback_reason_descr(reason),
lws_cx,
websrv);
/* scratch space for http header
* (probably only need LWS_PRE here, I think the +256 debris
* from o.g. example)
*/
uint8_t buf[LWS_PRE + 256];
uint8_t * start = &buf[LWS_PRE];
uint8_t * p = start;
uint8_t * end = &buf[sizeof(buf) - 1];
switch (reason) {
case LWS_CALLBACK_HTTP:
{
/* incoming_uri contains the uri suffix following our mountpoint [/dyn]
* (see WebserverImpl.init_mount_dynamic()).
*
* looks like this gets spuriously invoked for non-dynamic mountpoints
* given that we serve both filesystem tree
* (in url-space at /, from dir ./mount-origin) and dynamic http (in url-space at /dyn);
*
* however output from the spurious invocation seems to be discarded
*/
char const * incoming_uri
= reinterpret_cast<char const *>(incoming_data);
assert(http_pss->output_str == nullptr);
if (http_pss->output_str == nullptr) {
http_pss->output_str = new std::string;
}
lwsl_user("allocate output_str [%p] in http_pss [%p]",
http_pss->output_str, http_pss);
std::stringstream response_ss;
assert(websrv);
websrv->dynamic_http_response(incoming_uri,
&response_ss);
*(http_pss->output_str) = response_ss.str();
lwsl_user("LWS_CALLBACK_HTTP: got response [%s]",
http_pss->output_str->c_str());
/* choose mime type */
constexpr char const * c_mime_type = "application/json";
/* prepare and write http headers
* (do these precede &p ??)
*/
if (lws_add_http_common_headers(wsi,
HTTP_STATUS_OK,
c_mime_type,
http_pss->output_str->length(),
&p, end))
return 1;
if (lws_finalize_write_http_header(wsi, start, &p, end))
return 1;
/* write the body separately */
lws_callback_on_writable(wsi);
return 0;
}
case LWS_CALLBACK_HTTP_WRITEABLE:
{
if (!http_pss || !http_pss->output_str || (http_pss->output_str->length() == 0))
break;
/*
* Use LWS_WRITE_HTTP (instead of LWS_WRITE_HTTP_FINAL) for intermediate writes,
* on http/2 lws uses this to understand to end the stream with this
* frame.
*
* TODO: if output is large, write it in smaller chunks.
* expecting mtu like 1500 bytes, so maybe 128k
* chunks will work well?
*/
if (lws_write(wsi,
(uint8_t *)(http_pss->output_str->c_str()),
http_pss->output_str->length(),
LWS_WRITE_HTTP_FINAL)
!= static_cast<int>(http_pss->output_str->length()))
{
return 1;
}
/*
* HTTP/1.0 no keepalive: close network connection
* HTTP/1.1 or HTTP1.0 + KA: wait / process next transaction
* HTTP/2: stream ended, parent connection remains up
*/
if (lws_http_transaction_completed(wsi))
return -1;
return 0;
}
case LWS_CALLBACK_HTTP_BIND_PROTOCOL:
{
/* from libwebsocket docs:
* By default, all HTTP handling is done in protocols[0].
* However you can bind different protocols (by name) to different parts of the URL space using callback mounts.
* This callback occurs in the new protocol when a wsi is bound to that protocol.
* Any protocol allocation related to the http transaction processing should be created then.
* These specific callbacks are necessary because with HTTP/1.1,
* a single connection may perform a series of different transactions at different URLs,
* thus the lifetime of the protocol bind is just for one transaction, not connection.
*/
if (!http_pss)
break;
/* although we could allocate http_pss->output_ss here,
* instead delay until LWS_CALLBACK_HTTP.
* this reduces new/delete churn, since BIND/DROP callbacks
* will get invoked on every incoming request, not just for
* dynamic http requests.
*/
http_pss->output_str = nullptr;
lwsl_user("initialize http_pss->output_str to null in http_pss [%p]",
http_pss);
}
break;
case LWS_CALLBACK_HTTP_DROP_PROTOCOL:
/* from libwebsocket docs:
* This is called when a transaction is unbound from a protocol.
* It indicates the connection completed its transaction and may do something different now.
* Any protocol allocation related to the http transaction processing should be destroyed.
*/
if (!http_pss)
break;
if (http_pss->output_str) {
lwsl_user("destroy string [%p] in http_pss [%p]",
http_pss->output_str, http_pss);
delete http_pss->output_str;
http_pss->output_str = nullptr; /*hygiene*/
}
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user_data, incoming_data, len);
} /*notify_dynamic_http*/
void
WebserverImpl::start_webserver()
{
switch(state_) {
case Runstate::stopped:
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->thread_ptr_.reset(new std::thread(&WebserverImpl::run, this));
this->state_ = Runstate::running;
}
break;
case Runstate::stop_requested:
throw std::runtime_error("webserver in stop-requested state");
/* could invent a "restart-requested" state, I suppose */
break;
case Runstate::running:
throw std::runtime_error("webserver already running");
break;
}
} /*start_webserver*/
namespace {
DynamicEndpoint *
lookup_stem(std::string const & stem,
EndpointMap const & ep_map)
{
scope log(XO_DEBUG(true /*debug_flag*/),
xtag("stem", stem));
auto ix = ep_map.find(stem);
if (ix != ep_map.end())
return ix->second.get();
else
return nullptr;
} /*lookup_stem*/
DynamicEndpoint *
lookup_pattern(std::string const & incoming_uri,
EndpointMap const & ep_map)
{
if (incoming_uri.empty())
return nullptr;
/* find longest prefix of incoming_uri that appears in .stem_map.
*
* 1. try the whole uri
* 2. try successively shorter prefixes of uri that end in '/'
* 3. try successively shorter prefixes of uri that do not end in '/'
*/
/* 1. try the whole uri */
DynamicEndpoint * endpoint = nullptr;
endpoint = lookup_stem(incoming_uri, ep_map);
if (!endpoint) {
/* 2. try successively shorter prefixes of uri that end in '/'.
* we already checked for the whole uri, so look for a match
* at or before the 2nd-last character
*/
if (incoming_uri.size() >= 2) {
std::string::size_type p = incoming_uri.size() - 1;
while (!endpoint) {
p = incoming_uri.find_last_of('/', p-1);
if (p == std::string::npos)
break;
endpoint
= lookup_stem(incoming_uri.substr(0, p+1), ep_map);
if (p == 0)
break;
}
}
}
if (!endpoint) {
/* 3. try successively shorter prefixes of uri that don't end in '/'.
*/
if (incoming_uri.size() >= 2) {
std::string::size_type p = incoming_uri.size() - 2;
while (!endpoint) {
if (incoming_uri[p] == '/') {
/* all stems ending in '/' have already been excluded */
;
} else {
endpoint
= lookup_stem(incoming_uri.substr(0, p+1), ep_map);
}
if (p == 0)
break;
--p;
}
}
}
return endpoint;
} /*lookup_pattern*/
} /*namespace*/
void
WebserverImpl::dynamic_http_response(std::string const & incoming_uri,
std::ostream * p_os)
{
DynamicEndpoint * endpoint = lookup_pattern(incoming_uri,
this->stem_map_);
if (endpoint) {
endpoint->http_response(incoming_uri, p_os);
return;
} else {
/* if control here, no match */
/* or replace pss->str, pss->len with whatever dynamic content you like */
time_t t0 = ::time(nullptr);
*p_os << ("<html>"
"<img src=\"/libwebsockets.org-logo.svg\">"
"<br>no dynamic content for uri [")
<< incoming_uri
<< ("]"
" from mountpoint."
"<br>time: ")
<< ctime(&t0)
<< "</html>";
}
} /*dynamic_http_response*/
void
WebserverImpl::perform_ws_cmd(uint32_t session_id,
std::string_view incoming_cmd)
{
/* expecting input like:
* {"command": "subscribe",
* "stream": "usl"}
*/
scope log(XO_ENTER0(info),
xtag("incoming_cmd", incoming_cmd));
Json::Value root;
JSONCPP_STRING err;
bool ok = this->readjson_->parse(incoming_cmd.data(),
incoming_cmd.data() + incoming_cmd.size(),
&root,
&err);
if (!ok) {
log && log("error: parsing failed",
xtag("incoming_cmd", incoming_cmd));
}
//std::cout << "WebserverImpl::perform_ws_cmd :root [" << root << "]" << std::endl;
std::string cmd = root["cmd"].asString();
log && log("ws command", xtag("cmd", cmd));
//std::cout << "WebserverImpl::perform_ws_cmd :cmd [" << cmd << "]" << std::endl;
if (cmd == "subscribe") {
std::string stream_name = root["stream"].asString();
log && log("subscribe stream", xtag("stream", stream_name));
DynamicEndpoint * endpoint = lookup_pattern(stream_name,
this->stream_map_);
if (endpoint) {
log && log("endpoint found");
/* sink to receive outbound events bound for session_id,
* for stream_name
*/
rp<AbstractSink> ws_sink
= WebsocketSink::make(this,
this->pjson_,
session_id,
stream_name);
log && log("sink created");
assert(ws_sink->allow_polymorphic_source());
assert(ws_sink->allow_volatile_source());
WebsocketSessionRecd * ws_recd = this->session_v_[session_id].get();
assert(ws_recd);
ws_recd->subscribe_endpoint(std::string(incoming_cmd),
endpoint,
ws_sink);
} else {
log && log("endpoint not found");
}
}
} /*perform_ws_cmd*/
void
WebserverImpl::interrupt_stop_webserver()
{
/* NOTE: this is threadsafe - ::lws_cancel_service()
* writes to a pipe to interrupt polling loop
*/
{
this->interrupt_flag_ = true;
if (this->lws_cx_) {
::lws_cancel_service(this->lws_cx_);
}
}
std::unique_lock<std::mutex> lock(this->mutex_);
this->state_ = Runstate::stop_requested;
} /*interrupt_stop_webserver*/
void
WebserverImpl::stop_webserver()
{
std::unique_lock<std::mutex> lock(this->mutex_);
if(this->state_ == Runstate::running) {
this->interrupt_stop_webserver();
}
} /*stop_webserver*/
void
WebserverImpl::join_webserver() {
while(true) {
std::unique_lock<std::mutex> lock(this->mutex_);
if (this->state_ == Runstate::stopped)
break;
this->cond_.wait(lock);
}
if (this->thread_ptr_) {
this->thread_ptr_->join();
this->thread_ptr_ = nullptr;
}
} /*join_webserver*/
void
WebserverImpl::send_text(uint32_t session_id,
std::string text)
{
scope log(XO_ENTER0(info));
log && log(xtag("session_id", session_id),
xtag(".session_v.size", this->session_v_.size()));
if (session_id < this->session_v_.size()) {
WebsocketSessionRecd * p_session_recd = this->session_v_[session_id].get();
if (p_session_recd)
p_session_recd->send_text(text);
//per_session_data__minimal * ws_pss = p_session_recd->ws_pss();
//lscope.log(xtag("ws_pss", ws_pss),
// xtag("ws_pss.wsi", ws_pss->wsi));
} else {
assert(false);
}
} /*send_text*/
void
WebserverImpl::lws_write_pending_traffic(WsSafetyToken const & ws_safety_token)
{
scope log(XO_ENTER0(info));
ws_safety_token.verify();
for (auto & session_ptr : this->session_v_) {
if (session_ptr)
session_ptr->lws_write_pending(ws_safety_token);
}
} /*lws_write_pending_traffic*/
/* sequester .ws_safety_token:
* it may only be used by dedicated websocket library thread
* (the unique thread that calls ::lws_service())
*/
class WebserverImplWsThread : public WebserverImpl {
public:
WebserverImplWsThread(WebserverConfig const & ws_config,
rp<PrintJson> const & pjson)
: WebserverImpl(ws_config, pjson)
{
scope log(XO_DEBUG(true /*debug_flag*/),
xtag("self", (void*)this));
this->set_lws_log_level();
#if defined(LWS_WITH_PLUGINS)
this->init_pvo(&(this->pvo_));
#endif
this->init_protocols(&(this->protocol_v_));
this->init_mount_dynamic(&(this->mount_dynamic_));
this->init_mount_static(&(this->mount_dynamic_),
&(this->mount_static_));
this->init_cx_config(&(this->cx_config_));
} /*ctor*/
/* create instance */
static rp<WebserverImpl> make(WebserverConfig const & ws_config,
rp<PrintJson> const & pjson);
// ----- Inherited from WebserverImpl -----
/* init helper */
virtual void init_protocols(std::vector<lws_protocols> * p_v) override;
/* run webserver. borrows calling thread, doesn't return
* until webserver stopped.
*/
virtual void run() override;
private:
/* callback for lws-minimal protocol (websocket) */
static int notify_minimal(struct lws * wsi,
lws_callback_reasons reason,
void * user_data,
void * incoming_uri,
size_t len);
WsSafetyToken const & ws_safety_token() const { return ws_safety_token_; }
private:
/* a function taking .ws_safety_token as an argument,
* announces that it is being called from the libws thread,
* i.e. reentrantly from ::lws_service()
*/
WsSafetyToken ws_safety_token_;
}; /*WebserverImplWsThread*/
/* 1. anything after the host:port prefix will get handled by callback_dynamic_http
* 2. host::port alone will upgrade to "lws-minimal" for websocket demo
*
* in practice p_v will be &WebserverImpl::protocol_v_
*/
void
WebserverImplWsThread::init_protocols(std::vector<lws_protocols> * p_v)
{
/* lws_protocols:
* .name
* .callback
* .per_session_data_size
* .rx_buffer_size
* .id advertised as accessible from callback, but don't see how to use this
* .user advertised as accessible from callback, but don't see how to make this work.
* looks like libwebsocket allocates its own struct, even if .user is nonempty,
* and whether or not .per_session_data_size is 0.
* .tx_packet_size
*/
p_v->push_back({
"http",
&WebserverImpl::notify_dynamic_http,
sizeof(struct per_session_data__http),
0,
0,
NULL,
0
});
p_v->push_back({
"lws-minimal",
&WebserverImplWsThread::notify_minimal,
sizeof(struct per_session_data__minimal),
128,
0,
NULL,
0});
/* mandatory end-of-array sentinel, requires by lws */
p_v->push_back(LWS_PROTOCOL_LIST_TERM);
} /*init_protocols*/
/* called reentrantly from ::lws_service(),
* to do work on behalf of the websocket protocol "lws-minimal"
*/
int
WebserverImplWsThread::notify_minimal(struct lws * wsi,
lws_callback_reasons reason,
void * user_data,
void * input,
size_t input_z)
{
scope log(XO_ENTER0(info),
xtag("wsi", (void*)wsi));
lwsl_user("WebserverImpl::notify_minimal: enter"
": reason %d (%s)",
reason,
WebsockUtil::ws_callback_reason_descr(reason));
assert(wsi);
lws_context * lws_cx = lws_get_context(wsi);
assert(lws_cx);
void * cx_user_data = lws_context_user(lws_cx);
WebserverImplWsThread * websrv = reinterpret_cast<WebserverImplWsThread *>(cx_user_data);
assert(websrv);
WsSafetyToken const & ws_token = websrv->ws_safety_token();
struct per_session_data__minimal * ws_pss
= ((struct per_session_data__minimal *)user_data);
lwsl_user("WebserverImpl::notify_minimal: enter"
": reason %d (%s): wsi [%p], ws_pss [%p], lws_cx [%p] websrv [%p]\n",
reason,
WebsockUtil::ws_callback_reason_descr(reason),
wsi,
ws_pss,
lws_cx,
websrv);
struct per_vhost_data__minimal * vhd
= ((struct per_vhost_data__minimal *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi)));
int m;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
{
vhd = (reinterpret_cast<per_vhost_data__minimal *>
(lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__minimal))));
vhd->context = lws_get_context(wsi);
vhd->vhost = lws_get_vhost(wsi);
vhd->protocol = lws_get_protocol(wsi);
vhd->pss_list = nullptr;
vhd->next_session_id_ = 1;
//vhd->current = 0;
lwsl_user("WebserverImpl::notify_minimal: vhost=%p, protocols=%p protocol.name=%s\n",
vhd->vhost, vhd->protocol, vhd->protocol->name);
}
break;
case LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL:
case LWS_CALLBACK_HTTP_BIND_PROTOCOL:
{
/* looks like control comes here with
* LWS_CALLBACK_HTTP_BIND_PROTOCOL,
* although based on docs would seem to expect
* LWS_CALLBACK_WS_SERVER_BIND_PROTOCOL
*
* In any case, control here when new websocket session created
*/
if (!ws_pss)
break;
assert(vhd);
ws_pss->output_buf_ = new OutputBuffer(vhd->next_session_id_++);
lwsl_user("establish pss->output_buf [%p] in ws_pss [%p]",
ws_pss->output_buf_,
ws_pss);
}
break;
case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
{
if (!ws_pss)
break;
lwsl_user("destroy pss->output_msg [%p] in ws_pss [%p]",
ws_pss->output_buf_, ws_pss);
/* don't do this here. need to access ws_pss->output_buf
* from LWS_CALLBACK_CLOSED
*/
#ifdef BROKEN
if (ws_pss->output_buf_) {
delete ws_pss->output_buf_;
ws_pss->output_buf_ = nullptr;
}
#endif
}
break;
case LWS_CALLBACK_ESTABLISHED:
{
/* control comes here when a websocket session is opened
* (after protocol negotiated)
*/
OutputBuffer * output_buf = ws_pss->output_buf_;
output_buf->establish_wsi(wsi);
websrv->notify_ws_session_open(output_buf, vhd, ws_token);
}
break;
case LWS_CALLBACK_CLOSED:
{
/* control comes here when a websocket session is closed */
assert(websrv);
assert(ws_pss);
assert(ws_pss->output_buf_);
assert(vhd);
websrv->notify_ws_session_close(ws_pss->output_buf_, vhd, ws_token);
if (ws_pss->output_buf_) {
delete ws_pss->output_buf_;
ws_pss->output_buf_ = nullptr;
}
lwsl_user("LWS_CALLBACK_CLOSED: done");
}
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
{
if (websrv)
websrv->lws_write_pending_traffic(ws_token);
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
{
/* control here when:
* 1. application wants to send data
* (app uses lws_cancel_service() to trigger
* LWS_CALLBACK_EVENT_WAIT_CANCELLED)
* 2. websocket session that was previously blocked
* is now ready to receive data
* (see LWS_CALLBACK_SERVER_WRITEABLE above)
*/
#ifdef NOT_USING
if (!vhd->amsg.payload)
break;
#endif
if (!vhd) {
lwsl_user("client entry: vhd not yet established, return");
break;
}
if (!ws_pss) {
lwsl_user("client entry: ws_pss not yet established, return");
break;
}
if (!(ws_pss->output_buf_)) {
lwsl_user("client entry: output_msg buffer not established, return");
/* output message container hasn't been established,
* probably bc nothing to send
*/
break;
}
lwsl_user("notify_minimal: unblock writing, output_msg=[%p]",
ws_pss->output_buf_);
if (ws_pss->output_buf_->is_writeable(ws_token)) {
//assert(false);
} else {
/* a previous call to lws_write() reported a partial write;
* that write has now completed
*/
ws_pss->output_buf_->lws_write_completion(ws_token);
break;
}
if (ws_pss->output_buf_->is_idle()) {
lwsl_user("client entry: output_msg buffer up-to-date, return");
/* already up-to-date, nothing new to send */
break;
}
#ifdef NOT_USING
if (ws_pss->last == vhd->current) {
/* already up-to-date */
break;
}
if (!pss->output_msg_.payload) {
pss->output_msg_.payload = ::malloc(LWS_PRE + output_z);
pss->output_msg_.len = output_z;
}
::memcpy((char *)pss->output_msg_.payload + LWS_PRE, output_cstr, output_z);
lwsl_user("allocate pss->output_msg [%p] in pss [%p]",
pss->output_msg_.payload, pss);
m = lws_write(wsi,
((unsigned char *)pss->output_msg_.payload) + LWS_PRE,
pss->output_msg_.len,
LWS_WRITE_TEXT);
#endif
//XO_SCOPE(lscope);
/* pss->output_msg_ was populated from WebserverImpl.send_text(), q.v. */
m = ws_pss->output_buf_->lws_write_aux(ws_token);
#ifdef NOT_USING
m = lws_write(wsi, ((unsigned char *)vhd->amsg.payload) +
LWS_PRE, vhd->amsg.len, LWS_WRITE_TEXT);
if (m < (int)vhd->amsg.len) { .. }
#endif
if (m == -1) {
lwsl_err("WebserverImplWsThread::notify_minimal: return -1 from callback");
return -1;
}
//pss->last = vhd->current;
}
break;
case LWS_CALLBACK_RECEIVE:
{
char const * incoming_cmd
= reinterpret_cast<char const *>(input);
std::string_view incoming_svw(incoming_cmd, input_z);
//lwsl_user("receive: [%s], z [%d]", incoming_cmd, (int)input_z);
assert(ws_pss);
assert(websrv);
uint32_t session_id = ws_pss->output_buf_->session_id();
websrv->perform_ws_cmd(session_id,
incoming_svw);
#ifdef OBSOLETE
if (vhd->amsg.payload)
minimal_destroy_message(&(vhd->amsg));
vhd->amsg.len = input_z; //output_z;
/* notice we over-allocate by LWS_PRE */
vhd->amsg.payload = ::malloc(LWS_PRE + input_z);
if (!vhd->amsg.payload) {
lwsl_user("OOM: dropping\n");
break;
}
::memcpy((char *)vhd->amsg.payload + LWS_PRE, input, input_z);
//vhd->current++;
#endif
#ifdef OBSOLETE
/*
* let everybody know we want to write something on them
* as soon as they are ready
*/
lws_start_foreach_llp(struct per_session_data__minimal **,
ppss, vhd->pss_list) {
lws_callback_on_writable((*ppss)->wsi);
} lws_end_foreach_llp(ppss, pss_list);
#endif
}
break;
default:
break;
}
log.end_scope();
return 0;
} /*notify_minimal*/
void
WebserverImplWsThread::run()
{
scope log(XO_DEBUG(false /*debug_flag*/));
lwsl_user("LWS minimal http server dynamic"
" | visit http://localhost:%d\n", this->ws_config_.port());
#if defined(LWS_WITH_PLUGINS)
lwsl_user("LWS_WITH_PLUGINS present");
lwsl_user("LWS_WITH_TLS present");
#endif
/* exit when .state is stop_requested, setting state to .stopped */
this->lws_cx_ = lws_create_context(&(this->cx_config_));
if (!(this->lws_cx_)) {
lwsl_err("lws init failed\n");
return;
}
std::int32_t n_event = 0;
while ((n_event >= 0) && !(this->interrupt_flag_)) {
n_event = ::lws_service(this->lws_cx_,
0 /*ignored (used to be timeout)*/);
}
log && log("webserver runner returned - service loop exited",
xtag("n_event", n_event),
xtag("interrupted", this->interrupt_flag_.load()));
lws_context_destroy(this->lws_cx_);
this->lws_cx_ = nullptr;
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->state_ = Runstate::stopped;
this->cond_.notify_all();
}
log && log("exit");
} /*run*/
rp<WebserverImpl>
WebserverImplWsThread::make(WebserverConfig const & ws_config,
rp<PrintJson> const & pjson)
{
return new WebserverImplWsThread(ws_config, pjson);
} /*make*/
// ----- Webserver -----
rp<Webserver>
Webserver::make(WebserverConfig const & ws_config,
rp<PrintJson> const & pjson) {
return WebserverImplWsThread::make(ws_config, pjson);
} /*make*/
void
Webserver::display(std::ostream & os) const {
os << "<Webserver"
<< xtag("state", this->state())
<< ">";
} /*display*/
} /*namespace web*/
} /*namespace xo*/
/* end Webserver.cpp */