Buffer Sources and Sinks
This section explains the BufferSource and BufferSink concepts for zero-copy I/O where the callee owns the buffers.
Prerequisites
-
Completed Sources and Sinks
-
Understanding of caller-owns-buffers patterns
Callee-Owns-Buffers Pattern
With streams and sources/sinks, the caller provides buffers:
// Caller owns the buffer
char my_buffer[1024];
co_await stream.read_some(make_buffer(my_buffer));
Data flows: source → caller’s buffer → processing
With buffer sources/sinks, the callee provides buffers:
// Callee owns the buffers
const_buffer arr[8];
auto [ec, bufs] = co_await source.pull(arr);
// bufs is a span pointing into source's internal storage
Data flows: source’s internal buffer → processing (no copy!)
BufferSource
A BufferSource provides read-only buffers from its internal storage:
template<typename T>
concept BufferSource =
requires(T& src, std::span<const_buffer> dest, std::size_t n) {
{ src.pull(dest) } -> IoAwaitable;
// pull await-returns (error_code, std::span<const_buffer>)
src.consume(n);
};
pull Semantics
IoAwaitable auto pull(std::span<const_buffer> dest);
void consume(std::size_t n);
pull await-returns (error_code, std::span<const_buffer>). The source fills dest with descriptors pointing into its internal storage and returns a span over the filled prefix:
-
On success:
!ec, the returned span describes the available data -
On exhaustion:
ec == cond::eofwith an empty span -
On error:
ecset to the error condition
The returned buffers point into the source’s internal storage and remain valid until you call consume. Calling pull again without an intervening consume returns the same unconsumed data. Call consume(n) to mark n bytes as consumed and advance the source.
Example
template<BufferSource Source>
task<> process_source(Source& source)
{
const_buffer arr[8];
for (;;)
{
auto [ec, bufs] = co_await source.pull(arr);
if (ec == cond::eof)
break; // Source exhausted
if (ec)
throw std::system_error(ec);
// Process buffers (zero-copy!)
std::size_t total = 0;
for (auto const& b : bufs)
{
process_data(b.data(), b.size());
total += b.size();
}
source.consume(total);
}
}
BufferSink
A BufferSink provides writable buffers for direct write access:
template<typename T>
concept BufferSink =
requires(T& sink, std::span<mutable_buffer> dest, std::size_t n) {
{ sink.prepare(dest) } -> std::same_as<std::span<mutable_buffer>>;
{ sink.commit(n) } -> IoAwaitable; // await-returns (error_code)
{ sink.commit_eof(n) } -> IoAwaitable; // await-returns (error_code)
};
prepare Semantics
std::span<mutable_buffer> prepare(std::span<mutable_buffer> dest);
Synchronous operation. The sink fills dest with writable buffer descriptors from its internal storage and returns a span over the filled prefix (which may be shorter than dest).
commit Semantics
IoAwaitable auto commit(std::size_t n);
IoAwaitable auto commit_eof(std::size_t n);
commit(n) finalizes n bytes of prepared data and await-returns (error_code). commit_eof(n) finalizes n final bytes and signals end-of-stream; pass 0 to signal end-of-stream with no further data.
Example
template<BufferSink Sink>
task<> write_to_sink(Sink& sink, std::span<char const> data)
{
std::size_t written = 0;
while (written < data.size())
{
mutable_buffer arr[8];
auto bufs = sink.prepare(arr);
if (bufs.empty())
throw std::runtime_error("sink full");
// Copy into sink's buffers
std::size_t copied = 0;
for (auto& b : bufs)
{
if (written >= data.size())
break;
std::size_t chunk = (std::min)(
b.size(),
data.size() - written);
std::memcpy(b.data(), data.data() + written, chunk);
written += chunk;
copied += chunk;
}
if (written == data.size())
co_await sink.commit_eof(copied);
else
co_await sink.commit(copied);
}
}
Zero-Copy Benefits
Buffer sources/sinks enable true zero-copy I/O:
Memory-Mapped Files
A BufferSource is a concept, not a base class, so a type models it
simply by providing the required members:
class mmap_source // models BufferSource
{
void* mapped_region_;
std::size_t size_;
std::size_t offset_ = 0;
public:
io_task<std::span<const_buffer>> pull(std::span<const_buffer> dest)
{
if (offset_ >= size_)
co_return {error::eof, {}}; // Exhausted
// Return pointer into mapped memory: no copy!
dest[0] = const_buffer(
static_cast<char*>(mapped_region_) + offset_,
size_ - offset_);
co_return {{}, dest.subspan(0, 1)};
}
void consume(std::size_t n)
{
offset_ += n;
}
};
Type-Erasing Wrappers
Example: Compression Pipeline
// Compressor provides compressed data via BufferSource
// Decompressor consumes compressed data via BufferSink
task<> decompress_stream(any_buffer_source& compressed, any_write_sink& output)
{
const_buffer arr[8];
for (;;)
{
auto [ec, bufs] = co_await compressed.pull(arr);
if (ec == cond::eof)
break;
if (ec)
throw std::system_error(ec);
std::size_t total = 0;
for (auto const& b : bufs)
{
auto decompressed = decompress_block(b);
co_await output.write(make_buffer(decompressed));
total += b.size();
}
compressed.consume(total);
}
co_await output.write_eof();
}
Reference
| Header | Description |
|---|---|
|
BufferSource concept definition |
|
BufferSink concept definition |
|
Type-erased buffer source wrapper |
|
Type-erased buffer sink wrapper |
You have now learned about buffer sources and sinks for zero-copy I/O. Continue to Transfer Algorithms to learn about composed read/write operations.