xo-arena: + DCircularBuffer + CircularBufferConfig

This commit is contained in:
Roland Conybeare 2026-01-11 12:32:29 -05:00
commit 4a4a4afa1f
4 changed files with 773 additions and 0 deletions

View file

@ -0,0 +1,62 @@
/** @file CircularBufferConfig.hpp
*
* @author Roland Conybeare, Jan 2026
**/
#pragma once
#include <string>
namespace xo {
namespace mm {
/** @class CircularBufferConfig
*
* @brief configuration for a @ref DCircularBuffer instance
**/
struct CircularBufferConfig {
/** @defgroup mm-circularbufferconfig-instance-vars CircularBufferConfig members **/
///@{
/** optional name, for diagnostics **/
std::string name_;
/** hard maximum buffer size = reserved virtual memory.
* Buffer will generally map much less than this amount of memory
**/
std::size_t max_capacity_ = 0;
/** hugepage size -- using huge pages relieves some TLB pressure,
* at expense of inefficient memory consumption for (up to two)
* partially used superpages.
**/
std::size_t hugepage_z_ = 2 * 1024 * 1024;
/** Threshold 'move efficeincy' = (move_distance / move_qty)
* applies to moving unread input to the beginning of mapped range,
* when not prevented by pinned ranges.
*
* Higher numbers reduce cpu consumption but increase memory consumption
* Reciprocal loose ceiling on relative effort that may be spent on
* moving fractional input
**/
float threshold_move_efficiency_ = 50.0;
/** lower bound for hard maximum number of capture spans.
*
* Expected use case is to track spans that are currently referenced
* (rather than copied) from outside a DCircularBuffer instance.
* Circular buffer will not unmap or overwrite memory for such spans.
*
* Expect to generally release captured spans in the same order they
* were captured. Out of order release is supported, but cost
* of out-of-order release grows
* like O(r) for r remembered spans.
*
* A typical parser will need spans to remember one line of input
**/
std::size_t max_captured_span_ = 0;
/** true to enable debug logging **/
bool debug_flag_ = false;
///@}
};
} /*namespace mm*/
} /*namespace xo*/
/* end CircularBufferConfig.hpp */

View file

@ -0,0 +1,254 @@
/** @file DCircularBuffer.hpp
*
* @author Roland Conybeare, Jan 2026
**/
#include "CircularBufferConfig.hpp"
#include "DArenaVector.hpp"
#include "hashmap/verify_policy.hpp"
#include "span.hpp"
#include <cstdint>
namespace xo {
namespace mm {
/** @class DCircularBuffer
*
* @brief high performance vm-aware circular buffer
*
* Circular buffer implementation with parsing-friendly performance features.
* - generalization of DArena.
* Like DArena, maps superpages as needed.
* Unlike DArena memory at the beginning of reserved range can be unmapped.
* - allows address range >> physical range
* - admits "Cheney on the MTA" strategy.
* May be feasible to reserve a lifetime address range (say 1TB)
* as long as buffer only every maps a subrange that fits in physical memory.
* - zero copy support for parsing / protocol trnaslation:
* provides capture/release semantics for a fixed number
* of remembered spans. Will never unmap memory for a remembered span,
* until that span is released.
* - automatically resets to beginning of reserved range
* whenever occupied range is empty
**/
struct DCircularBuffer {
public:
/** @defgroup mm-circularbuffer-types CircularBuffer type traits **/
///@{
/** an amount of memory **/
using size_type = std::size_t;
using byte = std::byte;
/** a contiguous addres range **/
using span_type = span<byte>;
using const_span_type = span<const byte>;
///@}
public:
/** @defgroup mm-cicrularbuffer-ctors CircularBuffer constructors **/
///@{
/** contruct instance
* @p config circular buffer configuration
* @p page_z o/s page size (via getpagesize())
* @p buffer_align_z alignment for buffer memory
* @p reserved_range reserved virtual address range
**/
DCircularBuffer(const CircularBufferConfig & config,
size_type page_z,
size_type buffer_align_z,
span_type reserved_range);
/** constructor */
DCircularBuffer(const CircularBufferConfig & config);
/** non-copyable **/
DCircularBuffer(const DCircularBuffer & other) = delete;
/** move ctor **/
DCircularBuffer(DCircularBuffer && other);
/**
* allocate virtual memory address (uncommitted!) for circular buffer
* with configuration @p config.
**/
static DCircularBuffer map(const CircularBufferConfig & config);
///@}
/** @defgroup mm-circularbuffer-const-methods CircularBuffer const methods **/
///@{
const_span_type reserved_range() const noexcept { return reserved_range_; }
const_span_type mapped_range() const noexcept { return mapped_range_; }
const_span_type occupied_range() const noexcept { return occupied_range_; }
/** verify DCircularBuffer invariants.
* Act on failure according to policy @p p
* (combination of throw|log bits)
*
* verify invariants:
* CB1: mapped_range_ is subrange of reserved_range_
* CB2: occupied_range_ is subrange of mapped_range_
* CB3: each remembered_spans_[i] is subrange of occupied_range_
* CB4: buffer_align_z_ > 0 when buffer is mapped
* CB5: reserved_range_.lo() aligned on buffer_align_z_ boundary
**/
bool verify_ok(verify_policy p = verify_policy::throw_only()) const;
///@}
/** @defgroup mm-circularbuffer-nonconst-methods CircularBuffer non-const methods **/
///@{
/** copy memory in span @p r into buffer starting at the end of
* @ref occupied_range_. Map new physical memory as needed.
* On success returns empty suffix of @p r.
* If buffer memory exhausted, may copy a prefix of @p r.
* In that case returns the remaining suffix of @p r.
**/
span_type append(span_type r);
/** DMA version of @ref append_span : get mapped span A at which
* buffer will receive new content. Upstream may write into
* A. It must then coordinate with buffer by calling
* @ref report_append(P) for some prefix P of A
*
* Example:
* @code
* CircularBuffer buf = ...;
* constexpr size_type z = 64*1024;
* auto span = buf.get_append_span(z);
* ssize_t nr = read(FD, span.lo(), span.size());
* if (nr > 0)
* buf.report_append(span.prefix(nr));
* @endcode
**/
span_type get_append_span(size_type desired_z);
/** update bookkeeping as if caller had invoked append(r);
* however caller has already written to mapped memory
* after using get_append_span(); so omit copy
**/
void report_append(span_type r);
/** expand hi end of mapped memory range to at least @p hi.
*
* Require: @p hi < @ref reserved_range_.hi
**/
bool expand_to(byte * hi);
/** consume span (or prefix thereof) previously obtained from @ref occupied_range()
* Caller represents that it won't need to read this memory again
* unless overlaps with a pinned span.
**/
void consume(span_type r);
/** pin memory range @p r. circular buffer will not touch
* addresses that appear in any pinned range.
* use to
**/
void pin_range(span_type r);
/** unwind a previous pin_range call on range @p r.
* both start and end or @p r should exactly match a pinned range.
**/
void unpin_range(span_type r);
///@}
private:
/** @defgroup mm-circularbuffer-private-methods CircularBuffer non-const methods **/
///@{
/** shrink occupied rnage to the smallest contiguous range that contains both:
* all of .input_range_, and all pinned ranges in .pinned_spans_
**/
void _shrink_occupied_to_fit();
/** check for edge condition in which there are no pinned ranges. **/
void _check_reset_map_start();
///@}
private:
/** @defgroup mm-circularbuffer-instance-vars CircularBuffer member variables **/
///@{
/* memory layout
*
* reserved_range_ : entire address range owned by buffer (may be huge, e.g., 1TB)
* mapped_range_ : subrange backed by physical memory (fits in RAM)
* occupied_range_ : subrange currently containing data
* input_range_ : subrange containing unread input
* pinned_spans_ : pinned subranges within occupied (prevents alteration or unmap)
*
* <------------------- .reserved_range --------------------->
* . <------------- .mapped_range -------------> .
* . . <----- .occupied_range -----> . .
* . . . <- .input_range -----> . .
* . . . . . . .
* ........------XXXXXXXIIIIIIIIIIIIIIIIIIIIII--------........
* pp ppp pp
* Legend:
* [.] reserved : uncommitted memory. may be huge (e.g. 1TB)
* [-] mapped : range backed by physical memory
* [X] consumed : preserved until last overlapping pin removed
* [I] input : unread content, waiting to be read
* [p] pinned : pinned memory will not be altered (let alone unmapped)
*
* Invariants:
* - .input_range <= .occupied_range <= .mapped_range <= .reserved_range
* - mapped_range_ cannot shrink to exclude any portion of a pinned span
*/
/** buffer configuration **/
CircularBufferConfig config_;
/** size of a VM page (obtained automatically via getpagesize()). 4k on ubuntu. 16k on osx **/
size_type page_z_;
/** alignment for buffer address range.
* In practice will be either page_z_ or config_.hugepage_z_
**/
size_type buffer_align_z_;
/** Circular buffer owns address range defined by this span.
* Aligned on @ref buffer_align_z_.
* Always a whole number of @ref page_z_ or @ref config_.hugepage_z_
**/
span_type reserved_range_;
/** buffer owns memory defined by this span.
* Always a subrange of reserved_range
* These addresses backed by physical memory.
* Always a whole number of @ref page_z_ or @ref config_.hugepage_z_
**/
span_type mapped_range_;
/** currently occupied buffer memory.
* Always a subrange of @ref mapped_range_
**/
span_type occupied_range_;
/** portion of occupied buffer memory waiting to be read.
* Always represents a subspan of @ref occupied_range_, with the same
* hi endpoint.
* conversely @ref consume shrinks @ref input_range_ by increasing its lo endpoint.
**/
span_type input_range_;
/** remembered spans. For anticipated use cases expect one vm page sufficient.
* Spans in this vector always represent subranges of @ref occupied_range_
*
* @ref pinned_spans_ is confined to @ref occupied_range_.
* (In particular it's *not* confined to @ref input_range_)
*
* sorted on increasing span.lo()
**/
DArenaVector<span_type> pinned_spans_;
///@}
};
}
} /*namespace xo*/
/* end DCircularBuffer.hpp */

View file

@ -0,0 +1,352 @@
/** @file DCircularBuffer.cpp
*
* @author Roland Conybeare, Jan 2026
**/
#include "DCircularBuffer.hpp"
#include "mmap_util.hpp"
#include <xo/indentlog/scope.hpp>
#include <xo/indentlog/print/tostr.hpp>
#include <sys/mman.h>
namespace xo {
namespace mm {
DCircularBuffer::DCircularBuffer(DCircularBuffer && other)
: config_{other.config_},
page_z_{other.page_z_},
buffer_align_z_{other.buffer_align_z_},
reserved_range_{other.reserved_range_},
mapped_range_{other.mapped_range_},
occupied_range_{other.occupied_range_},
pinned_spans_{std::move(other.pinned_spans_)}
{
other.reserved_range_ = span_type();
other.mapped_range_ = span_type();
other.occupied_range_ = span_type();
}
DCircularBuffer
DCircularBuffer::map(const CircularBufferConfig & config)
{
scope log(XO_DEBUG(config.debug_flag_));
/* vm page size. 4KB (probably if linux) or 16KB (probably if osx) */
size_t page_z = getpagesize();
bool enable_hugepage_flag = (config.max_capacity_ >= config.hugepage_z_);
/* Align start of arena memory on this boundary.
* Will use THP (transparent huge pages) if available
* and arena size is at least as large as hugepage size (2MB, probably)
*/
size_t align_z = (enable_hugepage_flag ? config.hugepage_z_ : page_z);
log && log(xtag("page_z", page_z),
xtag("align_z", align_z));
auto span = mmap_util::map_aligned_range(config.max_capacity_,
align_z,
enable_hugepage_flag,
config.debug_flag_);
if (!span.lo()) {
throw std::runtime_error(tostr("DCircularBuffer: reserve address range failed",
xtag("size", config.max_capacity_)));
}
return DCircularBuffer(config, page_z, align_z, span);
}
DCircularBuffer::DCircularBuffer(const CircularBufferConfig & config,
size_type page_z,
size_type buffer_align_z,
span_type reserved_range)
: config_{config},
page_z_{page_z},
buffer_align_z_{buffer_align_z},
reserved_range_{reserved_range},
mapped_range_{reserved_range_.prefix(0)},
occupied_range_{mapped_range_.prefix(0)},
input_range_{occupied_range_.prefix(0)},
pinned_spans_{}
{
}
bool
DCircularBuffer::verify_ok(verify_policy policy) const
{
using xo::xtag;
constexpr const char * c_self = "DCircularBuffer::verify_ok";
scope log(XO_DEBUG(false));
/* CB1: mapped_range_ is subrange of reserved_range_ */
if ((mapped_range_.lo() < reserved_range_.lo())
|| (mapped_range_.hi() > reserved_range_.hi()))
{
return policy.report_error(log,
c_self, ": expect mapped_range subset of reserved_range",
xtag("mapped.lo", (void*)mapped_range_.lo()),
xtag("mapped.hi", (void*)mapped_range_.hi()),
xtag("reserved.lo", (void*)reserved_range_.lo()),
xtag("reserved.hi", (void*)reserved_range_.hi()));
}
/* CB2: occupied_range_ is subrange of mapped_range_ */
if ((occupied_range_.lo() < mapped_range_.lo())
|| (occupied_range_.hi() > mapped_range_.hi()))
{
return policy.report_error(log,
c_self, ": expect occupied_range subset of mapped_range",
xtag("occupied.lo", (void*)occupied_range_.lo()),
xtag("occupied.hi", (void*)occupied_range_.hi()),
xtag("mapped.lo", (void*)mapped_range_.lo()),
xtag("mapped.hi", (void*)mapped_range_.hi()));
}
/* CB3: each remembered span is subrange of occupied_range_ */
for (size_type i = 0, n = pinned_spans_.size(); i < n; ++i) {
const const_span_type & pin = pinned_spans_[i];
if ((pin.lo() < occupied_range_.lo())
|| (pin.hi() > occupied_range_.hi()))
{
return policy.report_error(log,
c_self, ": expect remembered_span subset of occupied_range",
xtag("i", i),
xtag("pin.lo", (void*)pin.lo()),
xtag("pin.hi", (void*)pin.hi()),
xtag("occupied.lo", (void*)occupied_range_.lo()),
xtag("occupied.hi", (void*)occupied_range_.hi()));
}
}
/* CB4: buffer_align_z_ is non-zero (when buffer is mapped) */
if (!reserved_range_.is_null() && (buffer_align_z_ == 0)) {
return policy.report_error(log,
c_self, ": expect buffer_align_z > 0 when buffer is mapped",
xtag("buffer_align_z", buffer_align_z_));
}
/* CB5: reserved_range_ aligned on buffer_align_z_ boundary */
if (!reserved_range_.is_null() && (buffer_align_z_ > 0)) {
if (((size_type)(reserved_range_.lo()) % buffer_align_z_) != 0) {
return policy.report_error(log,
c_self, ": expect reserved_range.lo aligned on buffer_align_z",
xtag("reserved.lo", (void*)reserved_range_.lo()),
xtag("buffer_align_z", buffer_align_z_));
}
}
return true;
}
auto
DCircularBuffer::append(span_type src) -> span_type
{
span_type dest = get_append_span(src.size());
size_t copy_z = std::min(src.size(), dest.size());
::memcpy(occupied_range_.hi(), src.lo(), copy_z);
this->occupied_range_ += span_type(dest.lo(), copy_z);
return src.after_prefix(copy_z);
}
auto
DCircularBuffer::get_append_span(size_type desired_z) -> span_type
{
span_type dest = span_type(occupied_range_.hi(), desired_z);
if (dest.hi() > reserved_range_.hi()) {
/* under no circumstances go past the end of reserved range */
dest = span_type(dest.lo(), reserved_range_.hi());
}
/* establish mapped range at least to dest.hi */
this->expand_to(dest.hi());
/* report available memory */
return span_type(occupied_range_.hi(), mapped_range_.hi());
}
void
DCircularBuffer::report_append(span_type r)
{
if (r.lo() != occupied_range_.hi()) {
// error!
// this->capture_error(error::bad_append_report, r.size())
assert(false);
return;
}
if (r.hi() > mapped_range_.hi()) {
// error!
// this->capture_error(error::bad_append_report, r.size())
assert(false);
return;
}
this->occupied_range_ += r;
}
void
DCircularBuffer::consume(span_type r)
{
if (r.lo() != input_range_.lo()) {
assert(false);
return;
}
if (r.hi() > occupied_range_.hi()) {
assert(false);
return;
}
if (occupied_range_.lo() < input_range_.lo()) {
/* here: a pinned range prevents shrinking occupied_range */
this->input_range_ = input_range_.suffix_from(r.hi());
} else {
/* here: input; recompute occupied boundary */
this->input_range_ = input_range_.suffix_from(r.hi());
this->_shrink_occupied_to_fit();
}
this->_check_reset_map_start();
}
bool
DCircularBuffer::expand_to(byte * hi) {
scope log(XO_DEBUG(config_.debug_flag_));
if (hi < mapped_range_.hi()) {
/* nothing todo */
return true;
}
size_t add_z = hi - mapped_range_.hi();
size_t add_commit_z = padding::with_padding(add_z, buffer_align_z_);
byte * commit_start = mapped_range_.hi();
if (::mprotect(commit_start,
add_commit_z,
PROT_READ | PROT_WRITE) != 0)
{
if (log) {
log("commit failed");
log(xtag("commit_start", commit_start),
xtag("add_z", add_z),
xtag("add_commit_z", add_commit_z));
}
// this->capture_error(error::commit_failed, add_commit_z);
return false;
}
this->mapped_range_ += span(commit_start, add_commit_z);
return true;
}
void
DCircularBuffer::pin_range(span_type r)
{
// loop optimized for case where r falls
// _after_ any existing pinned ranges
size_type z = pinned_spans_.size();
size_type ip1 = z; // ip1 = i + 1
for (; ip1 > 0; --ip1) {
if (r.lo() > pinned_spans_[ip1 - 1].lo())
break;
// insert at i to maintain sorted order
pinned_spans_.insert(ip1 - 1, r);
return;
}
pinned_spans_.push_back(r);
}
void
DCircularBuffer::unpin_range(span_type r)
{
// loop optimized for case where r
// is the first pinned range
assert(pinned_spans_.size() > 0);
if (r == pinned_spans_[0]) {
this->pinned_spans_.erase(0);
/* removing pinned span means can perhaps shrink
* occupied range
*/
this->_shrink_occupied_to_fit();
this->_check_reset_map_start();
} else {
for (size_type i = 1; i < pinned_spans_.size(); ++i) {
if (r == pinned_spans_[i]) {
this->pinned_spans_.erase(i);
/* since this isn't the first pinned span,
* won't be able to shrink occupied range.
*/
return;
}
}
}
}
void
DCircularBuffer::_shrink_occupied_to_fit()
{
if (pinned_spans_.empty()) {
this->occupied_range_ = input_range_;
} else if (occupied_range_.lo() < pinned_spans_[0].lo()) {
this->occupied_range_ = occupied_range_.suffix_from(pinned_spans_[0].lo());
}
}
void
DCircularBuffer::_check_reset_map_start()
{
if (pinned_spans_.empty()
&& (input_range_ == occupied_range_)) {
// here: permissible to move input range to the beginning of mapped range.
// decide (heuristically) whether we think this is optimal
std::size_t input_z = input_range_.size();
// 1st clause checks efficiency.
// 2nd clause (probably redundant) check non-overlapping
if ((input_range_.lo() > (mapped_range_.lo()
+ std::max(page_z_,
static_cast<size_type>(config_.threshold_move_efficiency_ * input_z))))
&& (mapped_range_.lo() + input_z < input_range_.lo())) {
::memmove(mapped_range_.lo(), input_range_.lo(), input_z);
this->occupied_range_ = mapped_range_.prefix(input_z);
this->input_range_ = mapped_range_.prefix(input_z);
}
}
}
} /*namespace mm*/
} /*namespace xo*/
/* end DCircularBuffer.cpp */

View file

@ -0,0 +1,105 @@
/** @file mmap_util.cpp
*
* @author Roland Conybeare, Jan 2026
**/
#include "mmap_util.hpp"
#include "padding.hpp"
#include <sys/mman.h> // for mmap
namespace xo {
namespace mm {
auto
mmap_util::map_aligned_range(size_t req_z,
size_t align_z,
bool enable_hugepage_flag,
bool debug_flag) -> span_type
{
scope log(XO_DEBUG(debug_flag),
xtag("req_z", req_z),
xtag("align_z", align_z));
// 1. round up to multiple of align_z
size_t target_z = padding::with_padding(req_z, align_z); // 4.
// 2. mmap() will give us page-aligned memory,
// but not hugepage-aligned.
//
// Over-request by align_z to ensure
// aligned subrange of size target_z
//
byte * base = (byte *)(::mmap(nullptr,
target_z + align_z,
PROT_NONE,
MAP_PRIVATE | MAP_ANONYMOUS,
-1, 0));
// on mmap success: upper limit of mapped address range
byte * hi = base + (target_z + align_z);
// lowest hugepage-aligned address in [base, hi)
byte * aligned_base = (byte *)(padding::with_padding((size_t)base, align_z));
// end of hugeppage-aligned range starting at aligned_base
byte * aligned_hi = aligned_base + target_z;
log && log("acquired memory [lo,hi) using mmap",
xtag("lo", base),
xtag("aligned_lo", aligned_base),
xtag("req_z", req_z),
xtag("target_z", target_z),
xtag("aligned_hi", aligned_hi),
xtag("hi", hi));
// 3. assess mmap success
{
if (base == MAP_FAILED) {
throw std::runtime_error(tostr("ArenaAlloc: uncommitted allocation failed",
xtag("size", req_z)));
}
assert((size_t)aligned_base % align_z == 0);
assert(aligned_base >= base);
assert(aligned_base < base + align_z);
}
// 4. release unaligned prefix
if (base < aligned_base) {
size_t ua_prefix = aligned_base - base;
::munmap(base, ua_prefix);
}
// 5. release unaligned suffix
if (aligned_hi < hi) {
size_t suffix = hi - aligned_hi;
::munmap(aligned_hi, suffix);
}
if (enable_hugepage_flag) {
#ifdef __linux__
/** linux:
* opt-in to transparent huge pages (THP)
* provided OS configured to support them.
* otherwise fallback gracefully.
*
* Huge pages -> use fewer TLB entries + faster
* shorter path through page table.
*
* When we commit (i.e. obtain physical memory on page fault),
* typically expect to pay ~1us per superpage.
* Much better than ~500us to commit 512 4k VM pages.
*
* But wasted if we don't use the memory.
*
* Page table has a handful of levels
**/
::madvise(aligned_base, target_z, MADV_HUGEPAGE); // 8.
#endif
}
return span_type(aligned_base, aligned_hi);
}
} /*namespace mm*/
} /*namespace xo*/
/* end mmap_util.cpp */