diff --git a/include/xo/arena/CircularBufferConfig.hpp b/include/xo/arena/CircularBufferConfig.hpp new file mode 100644 index 0000000..710812a --- /dev/null +++ b/include/xo/arena/CircularBufferConfig.hpp @@ -0,0 +1,62 @@ +/** @file CircularBufferConfig.hpp +* + * @author Roland Conybeare, Jan 2026 + **/ + +#pragma once + +#include + +namespace xo { + namespace mm { + /** @class CircularBufferConfig + * + * @brief configuration for a @ref DCircularBuffer instance + **/ + struct CircularBufferConfig { + /** @defgroup mm-circularbufferconfig-instance-vars CircularBufferConfig members **/ + ///@{ + + /** optional name, for diagnostics **/ + std::string name_; + /** hard maximum buffer size = reserved virtual memory. + * Buffer will generally map much less than this amount of memory + **/ + std::size_t max_capacity_ = 0; + /** hugepage size -- using huge pages relieves some TLB pressure, + * at expense of inefficient memory consumption for (up to two) + * partially used superpages. + **/ + std::size_t hugepage_z_ = 2 * 1024 * 1024; + /** Threshold 'move efficeincy' = (move_distance / move_qty) + * applies to moving unread input to the beginning of mapped range, + * when not prevented by pinned ranges. + * + * Higher numbers reduce cpu consumption but increase memory consumption + * Reciprocal loose ceiling on relative effort that may be spent on + * moving fractional input + **/ + float threshold_move_efficiency_ = 50.0; + /** lower bound for hard maximum number of capture spans. + * + * Expected use case is to track spans that are currently referenced + * (rather than copied) from outside a DCircularBuffer instance. + * Circular buffer will not unmap or overwrite memory for such spans. + * + * Expect to generally release captured spans in the same order they + * were captured. Out of order release is supported, but cost + * of out-of-order release grows + * like O(r) for r remembered spans. + * + * A typical parser will need spans to remember one line of input + **/ + std::size_t max_captured_span_ = 0; + /** true to enable debug logging **/ + bool debug_flag_ = false; + + ///@} + }; + } /*namespace mm*/ +} /*namespace xo*/ + +/* end CircularBufferConfig.hpp */ diff --git a/include/xo/arena/DCircularBuffer.hpp b/include/xo/arena/DCircularBuffer.hpp new file mode 100644 index 0000000..17bd6b7 --- /dev/null +++ b/include/xo/arena/DCircularBuffer.hpp @@ -0,0 +1,254 @@ +/** @file DCircularBuffer.hpp +* + * @author Roland Conybeare, Jan 2026 + **/ + +#include "CircularBufferConfig.hpp" +#include "DArenaVector.hpp" +#include "hashmap/verify_policy.hpp" +#include "span.hpp" +#include + +namespace xo { + namespace mm { + /** @class DCircularBuffer + * + * @brief high performance vm-aware circular buffer + * + * Circular buffer implementation with parsing-friendly performance features. + * - generalization of DArena. + * Like DArena, maps superpages as needed. + * Unlike DArena memory at the beginning of reserved range can be unmapped. + * - allows address range >> physical range + * - admits "Cheney on the MTA" strategy. + * May be feasible to reserve a lifetime address range (say 1TB) + * as long as buffer only every maps a subrange that fits in physical memory. + * - zero copy support for parsing / protocol trnaslation: + * provides capture/release semantics for a fixed number + * of remembered spans. Will never unmap memory for a remembered span, + * until that span is released. + * - automatically resets to beginning of reserved range + * whenever occupied range is empty + **/ + struct DCircularBuffer { + public: + /** @defgroup mm-circularbuffer-types CircularBuffer type traits **/ + ///@{ + + /** an amount of memory **/ + using size_type = std::size_t; + using byte = std::byte; + /** a contiguous addres range **/ + using span_type = span; + using const_span_type = span; + + ///@} + + public: + /** @defgroup mm-cicrularbuffer-ctors CircularBuffer constructors **/ + ///@{ + + /** contruct instance + * @p config circular buffer configuration + * @p page_z o/s page size (via getpagesize()) + * @p buffer_align_z alignment for buffer memory + * @p reserved_range reserved virtual address range + **/ + DCircularBuffer(const CircularBufferConfig & config, + size_type page_z, + size_type buffer_align_z, + span_type reserved_range); + /** constructor */ + DCircularBuffer(const CircularBufferConfig & config); + /** non-copyable **/ + DCircularBuffer(const DCircularBuffer & other) = delete; + /** move ctor **/ + DCircularBuffer(DCircularBuffer && other); + + /** + * allocate virtual memory address (uncommitted!) for circular buffer + * with configuration @p config. + **/ + static DCircularBuffer map(const CircularBufferConfig & config); + + ///@} + + /** @defgroup mm-circularbuffer-const-methods CircularBuffer const methods **/ + ///@{ + + const_span_type reserved_range() const noexcept { return reserved_range_; } + const_span_type mapped_range() const noexcept { return mapped_range_; } + const_span_type occupied_range() const noexcept { return occupied_range_; } + + /** verify DCircularBuffer invariants. + * Act on failure according to policy @p p + * (combination of throw|log bits) + * + * verify invariants: + * CB1: mapped_range_ is subrange of reserved_range_ + * CB2: occupied_range_ is subrange of mapped_range_ + * CB3: each remembered_spans_[i] is subrange of occupied_range_ + * CB4: buffer_align_z_ > 0 when buffer is mapped + * CB5: reserved_range_.lo() aligned on buffer_align_z_ boundary + **/ + bool verify_ok(verify_policy p = verify_policy::throw_only()) const; + + ///@} + + /** @defgroup mm-circularbuffer-nonconst-methods CircularBuffer non-const methods **/ + ///@{ + + /** copy memory in span @p r into buffer starting at the end of + * @ref occupied_range_. Map new physical memory as needed. + * On success returns empty suffix of @p r. + * If buffer memory exhausted, may copy a prefix of @p r. + * In that case returns the remaining suffix of @p r. + **/ + span_type append(span_type r); + + /** DMA version of @ref append_span : get mapped span A at which + * buffer will receive new content. Upstream may write into + * A. It must then coordinate with buffer by calling + * @ref report_append(P) for some prefix P of A + * + * Example: + * @code + * CircularBuffer buf = ...; + * constexpr size_type z = 64*1024; + * auto span = buf.get_append_span(z); + * ssize_t nr = read(FD, span.lo(), span.size()); + * if (nr > 0) + * buf.report_append(span.prefix(nr)); + * @endcode + **/ + span_type get_append_span(size_type desired_z); + + /** update bookkeeping as if caller had invoked append(r); + * however caller has already written to mapped memory + * after using get_append_span(); so omit copy + **/ + void report_append(span_type r); + + /** expand hi end of mapped memory range to at least @p hi. + * + * Require: @p hi < @ref reserved_range_.hi + **/ + bool expand_to(byte * hi); + + /** consume span (or prefix thereof) previously obtained from @ref occupied_range() + * Caller represents that it won't need to read this memory again + * unless overlaps with a pinned span. + **/ + void consume(span_type r); + + /** pin memory range @p r. circular buffer will not touch + * addresses that appear in any pinned range. + * use to + **/ + void pin_range(span_type r); + + /** unwind a previous pin_range call on range @p r. + * both start and end or @p r should exactly match a pinned range. + **/ + void unpin_range(span_type r); + + ///@} + + private: + + /** @defgroup mm-circularbuffer-private-methods CircularBuffer non-const methods **/ + ///@{ + + /** shrink occupied rnage to the smallest contiguous range that contains both: + * all of .input_range_, and all pinned ranges in .pinned_spans_ + **/ + void _shrink_occupied_to_fit(); + + /** check for edge condition in which there are no pinned ranges. **/ + void _check_reset_map_start(); + + ///@} + + private: + /** @defgroup mm-circularbuffer-instance-vars CircularBuffer member variables **/ + ///@{ + + /* memory layout + * + * reserved_range_ : entire address range owned by buffer (may be huge, e.g., 1TB) + * mapped_range_ : subrange backed by physical memory (fits in RAM) + * occupied_range_ : subrange currently containing data + * input_range_ : subrange containing unread input + * pinned_spans_ : pinned subranges within occupied (prevents alteration or unmap) + * + * <------------------- .reserved_range ---------------------> + * . <------------- .mapped_range -------------> . + * . . <----- .occupied_range -----> . . + * . . . <- .input_range -----> . . + * . . . . . . . + * ........------XXXXXXXIIIIIIIIIIIIIIIIIIIIII--------........ + * pp ppp pp + * Legend: + * [.] reserved : uncommitted memory. may be huge (e.g. 1TB) + * [-] mapped : range backed by physical memory + * [X] consumed : preserved until last overlapping pin removed + * [I] input : unread content, waiting to be read + * [p] pinned : pinned memory will not be altered (let alone unmapped) + * + * Invariants: + * - .input_range <= .occupied_range <= .mapped_range <= .reserved_range + * - mapped_range_ cannot shrink to exclude any portion of a pinned span + */ + + /** buffer configuration **/ + CircularBufferConfig config_; + + /** size of a VM page (obtained automatically via getpagesize()). 4k on ubuntu. 16k on osx **/ + size_type page_z_; + + /** alignment for buffer address range. + * In practice will be either page_z_ or config_.hugepage_z_ + **/ + size_type buffer_align_z_; + + /** Circular buffer owns address range defined by this span. + * Aligned on @ref buffer_align_z_. + * Always a whole number of @ref page_z_ or @ref config_.hugepage_z_ + **/ + span_type reserved_range_; + + /** buffer owns memory defined by this span. + * Always a subrange of reserved_range + * These addresses backed by physical memory. + * Always a whole number of @ref page_z_ or @ref config_.hugepage_z_ + **/ + span_type mapped_range_; + + /** currently occupied buffer memory. + * Always a subrange of @ref mapped_range_ + **/ + span_type occupied_range_; + + /** portion of occupied buffer memory waiting to be read. + * Always represents a subspan of @ref occupied_range_, with the same + * hi endpoint. + * conversely @ref consume shrinks @ref input_range_ by increasing its lo endpoint. + **/ + span_type input_range_; + + /** remembered spans. For anticipated use cases expect one vm page sufficient. + * Spans in this vector always represent subranges of @ref occupied_range_ + * + * @ref pinned_spans_ is confined to @ref occupied_range_. + * (In particular it's *not* confined to @ref input_range_) + * + * sorted on increasing span.lo() + **/ + DArenaVector pinned_spans_; + + ///@} + }; + } +} /*namespace xo*/ + +/* end DCircularBuffer.hpp */ diff --git a/src/arena/DCircularBuffer.cpp b/src/arena/DCircularBuffer.cpp new file mode 100644 index 0000000..e4d31f4 --- /dev/null +++ b/src/arena/DCircularBuffer.cpp @@ -0,0 +1,352 @@ +/** @file DCircularBuffer.cpp + * + * @author Roland Conybeare, Jan 2026 + **/ + +#include "DCircularBuffer.hpp" +#include "mmap_util.hpp" +#include +#include +#include + +namespace xo { + namespace mm { + + DCircularBuffer::DCircularBuffer(DCircularBuffer && other) + : config_{other.config_}, + page_z_{other.page_z_}, + buffer_align_z_{other.buffer_align_z_}, + reserved_range_{other.reserved_range_}, + mapped_range_{other.mapped_range_}, + occupied_range_{other.occupied_range_}, + pinned_spans_{std::move(other.pinned_spans_)} + { + other.reserved_range_ = span_type(); + other.mapped_range_ = span_type(); + other.occupied_range_ = span_type(); + } + + DCircularBuffer + DCircularBuffer::map(const CircularBufferConfig & config) + { + scope log(XO_DEBUG(config.debug_flag_)); + + /* vm page size. 4KB (probably if linux) or 16KB (probably if osx) */ + size_t page_z = getpagesize(); + + bool enable_hugepage_flag = (config.max_capacity_ >= config.hugepage_z_); + + /* Align start of arena memory on this boundary. + * Will use THP (transparent huge pages) if available + * and arena size is at least as large as hugepage size (2MB, probably) + */ + size_t align_z = (enable_hugepage_flag ? config.hugepage_z_ : page_z); + + log && log(xtag("page_z", page_z), + xtag("align_z", align_z)); + + auto span = mmap_util::map_aligned_range(config.max_capacity_, + align_z, + enable_hugepage_flag, + config.debug_flag_); + + if (!span.lo()) { + throw std::runtime_error(tostr("DCircularBuffer: reserve address range failed", + xtag("size", config.max_capacity_))); + } + + return DCircularBuffer(config, page_z, align_z, span); + } + + DCircularBuffer::DCircularBuffer(const CircularBufferConfig & config, + size_type page_z, + size_type buffer_align_z, + span_type reserved_range) + : config_{config}, + page_z_{page_z}, + buffer_align_z_{buffer_align_z}, + reserved_range_{reserved_range}, + mapped_range_{reserved_range_.prefix(0)}, + occupied_range_{mapped_range_.prefix(0)}, + input_range_{occupied_range_.prefix(0)}, + pinned_spans_{} + { + } + + bool + DCircularBuffer::verify_ok(verify_policy policy) const + { + using xo::xtag; + + constexpr const char * c_self = "DCircularBuffer::verify_ok"; + scope log(XO_DEBUG(false)); + + /* CB1: mapped_range_ is subrange of reserved_range_ */ + if ((mapped_range_.lo() < reserved_range_.lo()) + || (mapped_range_.hi() > reserved_range_.hi())) + { + return policy.report_error(log, + c_self, ": expect mapped_range subset of reserved_range", + xtag("mapped.lo", (void*)mapped_range_.lo()), + xtag("mapped.hi", (void*)mapped_range_.hi()), + xtag("reserved.lo", (void*)reserved_range_.lo()), + xtag("reserved.hi", (void*)reserved_range_.hi())); + } + + /* CB2: occupied_range_ is subrange of mapped_range_ */ + if ((occupied_range_.lo() < mapped_range_.lo()) + || (occupied_range_.hi() > mapped_range_.hi())) + { + return policy.report_error(log, + c_self, ": expect occupied_range subset of mapped_range", + xtag("occupied.lo", (void*)occupied_range_.lo()), + xtag("occupied.hi", (void*)occupied_range_.hi()), + xtag("mapped.lo", (void*)mapped_range_.lo()), + xtag("mapped.hi", (void*)mapped_range_.hi())); + } + + /* CB3: each remembered span is subrange of occupied_range_ */ + for (size_type i = 0, n = pinned_spans_.size(); i < n; ++i) { + const const_span_type & pin = pinned_spans_[i]; + + if ((pin.lo() < occupied_range_.lo()) + || (pin.hi() > occupied_range_.hi())) + { + return policy.report_error(log, + c_self, ": expect remembered_span subset of occupied_range", + xtag("i", i), + xtag("pin.lo", (void*)pin.lo()), + xtag("pin.hi", (void*)pin.hi()), + xtag("occupied.lo", (void*)occupied_range_.lo()), + xtag("occupied.hi", (void*)occupied_range_.hi())); + } + } + + /* CB4: buffer_align_z_ is non-zero (when buffer is mapped) */ + if (!reserved_range_.is_null() && (buffer_align_z_ == 0)) { + return policy.report_error(log, + c_self, ": expect buffer_align_z > 0 when buffer is mapped", + xtag("buffer_align_z", buffer_align_z_)); + } + + /* CB5: reserved_range_ aligned on buffer_align_z_ boundary */ + if (!reserved_range_.is_null() && (buffer_align_z_ > 0)) { + if (((size_type)(reserved_range_.lo()) % buffer_align_z_) != 0) { + return policy.report_error(log, + c_self, ": expect reserved_range.lo aligned on buffer_align_z", + xtag("reserved.lo", (void*)reserved_range_.lo()), + xtag("buffer_align_z", buffer_align_z_)); + } + } + + return true; + } + + auto + DCircularBuffer::append(span_type src) -> span_type + { + span_type dest = get_append_span(src.size()); + + size_t copy_z = std::min(src.size(), dest.size()); + + ::memcpy(occupied_range_.hi(), src.lo(), copy_z); + + this->occupied_range_ += span_type(dest.lo(), copy_z); + + return src.after_prefix(copy_z); + } + + auto + DCircularBuffer::get_append_span(size_type desired_z) -> span_type + { + span_type dest = span_type(occupied_range_.hi(), desired_z); + + if (dest.hi() > reserved_range_.hi()) { + /* under no circumstances go past the end of reserved range */ + dest = span_type(dest.lo(), reserved_range_.hi()); + } + + /* establish mapped range at least to dest.hi */ + this->expand_to(dest.hi()); + + /* report available memory */ + return span_type(occupied_range_.hi(), mapped_range_.hi()); + } + + void + DCircularBuffer::report_append(span_type r) + { + if (r.lo() != occupied_range_.hi()) { + // error! + + // this->capture_error(error::bad_append_report, r.size()) + assert(false); + + return; + } + + if (r.hi() > mapped_range_.hi()) { + // error! + + // this->capture_error(error::bad_append_report, r.size()) + assert(false); + + return; + } + + this->occupied_range_ += r; + } + + void + DCircularBuffer::consume(span_type r) + { + if (r.lo() != input_range_.lo()) { + assert(false); + + return; + } + + if (r.hi() > occupied_range_.hi()) { + assert(false); + + return; + } + + if (occupied_range_.lo() < input_range_.lo()) { + /* here: a pinned range prevents shrinking occupied_range */ + + this->input_range_ = input_range_.suffix_from(r.hi()); + } else { + /* here: input; recompute occupied boundary */ + + this->input_range_ = input_range_.suffix_from(r.hi()); + + this->_shrink_occupied_to_fit(); + } + + this->_check_reset_map_start(); + } + + bool + DCircularBuffer::expand_to(byte * hi) { + scope log(XO_DEBUG(config_.debug_flag_)); + + if (hi < mapped_range_.hi()) { + /* nothing todo */ + return true; + } + + size_t add_z = hi - mapped_range_.hi(); + size_t add_commit_z = padding::with_padding(add_z, buffer_align_z_); + byte * commit_start = mapped_range_.hi(); + + if (::mprotect(commit_start, + add_commit_z, + PROT_READ | PROT_WRITE) != 0) + { + if (log) { + log("commit failed"); + log(xtag("commit_start", commit_start), + xtag("add_z", add_z), + xtag("add_commit_z", add_commit_z)); + } + + // this->capture_error(error::commit_failed, add_commit_z); + return false; + } + + this->mapped_range_ += span(commit_start, add_commit_z); + return true; + } + + void + DCircularBuffer::pin_range(span_type r) + { + // loop optimized for case where r falls + // _after_ any existing pinned ranges + + size_type z = pinned_spans_.size(); + size_type ip1 = z; // ip1 = i + 1 + + for (; ip1 > 0; --ip1) { + if (r.lo() > pinned_spans_[ip1 - 1].lo()) + break; + + // insert at i to maintain sorted order + pinned_spans_.insert(ip1 - 1, r); + return; + } + + pinned_spans_.push_back(r); + } + + void + DCircularBuffer::unpin_range(span_type r) + { + // loop optimized for case where r + // is the first pinned range + + assert(pinned_spans_.size() > 0); + + if (r == pinned_spans_[0]) { + this->pinned_spans_.erase(0); + + /* removing pinned span means can perhaps shrink + * occupied range + */ + this->_shrink_occupied_to_fit(); + this->_check_reset_map_start(); + } else { + for (size_type i = 1; i < pinned_spans_.size(); ++i) { + if (r == pinned_spans_[i]) { + this->pinned_spans_.erase(i); + + /* since this isn't the first pinned span, + * won't be able to shrink occupied range. + */ + return; + } + } + } + } + + void + DCircularBuffer::_shrink_occupied_to_fit() + { + if (pinned_spans_.empty()) { + this->occupied_range_ = input_range_; + } else if (occupied_range_.lo() < pinned_spans_[0].lo()) { + this->occupied_range_ = occupied_range_.suffix_from(pinned_spans_[0].lo()); + } + } + + void + DCircularBuffer::_check_reset_map_start() + { + if (pinned_spans_.empty() + && (input_range_ == occupied_range_)) { + + // here: permissible to move input range to the beginning of mapped range. + // decide (heuristically) whether we think this is optimal + + std::size_t input_z = input_range_.size(); + + // 1st clause checks efficiency. + // 2nd clause (probably redundant) check non-overlapping + if ((input_range_.lo() > (mapped_range_.lo() + + std::max(page_z_, + static_cast(config_.threshold_move_efficiency_ * input_z)))) + && (mapped_range_.lo() + input_z < input_range_.lo())) { + + ::memmove(mapped_range_.lo(), input_range_.lo(), input_z); + + this->occupied_range_ = mapped_range_.prefix(input_z); + this->input_range_ = mapped_range_.prefix(input_z); + } + } + } + + } /*namespace mm*/ +} /*namespace xo*/ + +/* end DCircularBuffer.cpp */ diff --git a/src/arena/mmap_util.cpp b/src/arena/mmap_util.cpp new file mode 100644 index 0000000..37db714 --- /dev/null +++ b/src/arena/mmap_util.cpp @@ -0,0 +1,105 @@ +/** @file mmap_util.cpp +* + * @author Roland Conybeare, Jan 2026 + **/ + +#include "mmap_util.hpp" +#include "padding.hpp" +#include // for mmap + +namespace xo { + namespace mm { + auto + mmap_util::map_aligned_range(size_t req_z, + size_t align_z, + bool enable_hugepage_flag, + bool debug_flag) -> span_type + { + scope log(XO_DEBUG(debug_flag), + xtag("req_z", req_z), + xtag("align_z", align_z)); + + // 1. round up to multiple of align_z + size_t target_z = padding::with_padding(req_z, align_z); // 4. + + // 2. mmap() will give us page-aligned memory, + // but not hugepage-aligned. + // + // Over-request by align_z to ensure + // aligned subrange of size target_z + // + byte * base = (byte *)(::mmap(nullptr, + target_z + align_z, + PROT_NONE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, 0)); + + // on mmap success: upper limit of mapped address range + byte * hi = base + (target_z + align_z); + // lowest hugepage-aligned address in [base, hi) + byte * aligned_base = (byte *)(padding::with_padding((size_t)base, align_z)); + // end of hugeppage-aligned range starting at aligned_base + byte * aligned_hi = aligned_base + target_z; + + log && log("acquired memory [lo,hi) using mmap", + xtag("lo", base), + xtag("aligned_lo", aligned_base), + xtag("req_z", req_z), + xtag("target_z", target_z), + xtag("aligned_hi", aligned_hi), + xtag("hi", hi)); + + // 3. assess mmap success + { + if (base == MAP_FAILED) { + throw std::runtime_error(tostr("ArenaAlloc: uncommitted allocation failed", + xtag("size", req_z))); + } + + assert((size_t)aligned_base % align_z == 0); + assert(aligned_base >= base); + assert(aligned_base < base + align_z); + } + + // 4. release unaligned prefix + if (base < aligned_base) { + size_t ua_prefix = aligned_base - base; + + ::munmap(base, ua_prefix); + } + + // 5. release unaligned suffix + if (aligned_hi < hi) { + size_t suffix = hi - aligned_hi; + + ::munmap(aligned_hi, suffix); + } + + if (enable_hugepage_flag) { +#ifdef __linux__ + /** linux: + * opt-in to transparent huge pages (THP) + * provided OS configured to support them. + * otherwise fallback gracefully. + * + * Huge pages -> use fewer TLB entries + faster + * shorter path through page table. + * + * When we commit (i.e. obtain physical memory on page fault), + * typically expect to pay ~1us per superpage. + * Much better than ~500us to commit 512 4k VM pages. + * + * But wasted if we don't use the memory. + * + * Page table has a handful of levels + **/ + ::madvise(aligned_base, target_z, MADV_HUGEPAGE); // 8. +#endif + } + + return span_type(aligned_base, aligned_hi); + } + } /*namespace mm*/ +} /*namespace xo*/ + +/* end mmap_util.cpp */