io_uring 是 Linux 内核 5.1 引入的一种新型异步 I/O 接口,旨在解决传统 epoll 在高并发场景下的性能瓶颈。最近我在 brpc 中尝试引入 io_uring 支持,在这个过程中遇到了不少坑,也积累了一些经验。本文记录了这次探索的完整过程,包括遇到的困难、最终的设计方案以及测试结果。
代码: https://github.com/duanfuxiang0/brpc
在开始 io_uring 集成之前,我花了一些时间深入学习了 brpc 的核心组件。这部分内容是后续设计的基础。
bthread 是 brpc 实现的用户态协程库,它的调度模型很有意思:
// 示例:设置 task_group 的 tag 数量
--task_group_ntags=4 // 每个 task_group 有 4 个 tag
// 示例:限制 bthread 在特定 pthread 上运行
--event_dispatcher_num=1 // 每个 tag 只有一个 event_dispatcher
bvar 提供高效的指标统计能力:
在事件分发器中,我们使用 LatencyRecorder 记录读写延迟:
extern bvar::LatencyRecorder* g_edisp_read_lantency;
extern bvar::LatencyRecorder* g_edisp_write_lantency;
(*g_edisp_read_lantency) << latency_ns;
EventDispatcher 是 brpc 的 I/O 多路复用核心接口,负责:
IOEventDataId 关联用户回调原有实现基于 epoll,我们需要在此基础上增加 io_uring 支持。
io_uring 采用 Submission Queue (SQ) 和 Completion Queue (CQ) 的双队列架构:
┌─────────────┐ Submit ┌─────────────┐
│ 用户空间 │ ───────────────────────▶│ 内核空间 │
│ SQE │ mmap 共享内存 │ SQ │
│ CQE │◀─────────────────────── │ CQ │
└─────────────┘ Complete └─────────────┘
它的优势主要体现在:
liburing 是官方提供的 C 封装,核心 API 如下:
// 初始化 io_uring 实例
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);
// 获取下一个 SQE
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);
// 准备 poll 操作
void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask);
// 提交操作
int io_uring_submit(struct io_uring *ring);
// 等待至少一个完成
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);
// 遍历所有就绪的 CQE
io_uring_for_each_cqe(ring, head, cqe) {
// 处理完成事件
}
// 标记已处理的 CQE
void io_uring_cq_advance(struct io_uring *ring, unsigned nr);
重要发现:liburing 的 API 并非线程安全!多个线程并发调用 io_uring_submit 会导致数据竞争。
这给我们的设计带来了第一个挑战:如何让多线程安全地操作同一个 io_uring 实例?
在集成过程中遇到了不少困难,每个都花了不少时间才找到合适的解决方案。
问题描述:
AddConsumer、RegisterEvent 等 API解决方案:Ring Owner 单线程模型
最终采用了单线程 Ring Owner 架构,所有 io_uring 操作由一个线程串行执行:
┌──────────────────────────────────────────────────────┐
│ EventDispatcher (Ring Owner) │
├──────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────────┐ │
│ │ Worker 1 │───▶│ MPSCQueue │ │
│ │ (pthread) │ │ (UringOp) │───▶ Ring │
│ └─────────────┘ └──────────────────┘ │
│ ┌─────────────┐ ▲ │
│ │ Worker 2 │───────────┘ │
│ │ (pthread) │ │
│ └─────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Run() 事件循环 │ │
│ │ 1. DrainOpQueue() - 处理队列中的操作 │ │
│ │ 2. io_uring_submit() - 提交 SQE │ │
│ │ 3. io_uring_wait_cqe() - 等待完成 │ │
│ │ 4. 处理 CQE,调用回调 │ │
│ └──────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
实现细节:
// 外部线程通过队列提交操作
struct UringOp {
UringOpType type; // ADD_POLL, ADD_OUTPUT, REMOVE_OUTPUT, REMOVE_POLL
int fd;
VRefId event_data_id;
bool pollin;
UringOpCompletion* completion; // 可选,用于同步等待
};
// Ring Owner 线程的事件循环
void EventDispatcher::Run() {
while (!_stop) {
// 1. 处理队列中的操作
DrainOpQueue();
// 2. 提交并等待
io_uring_submit_and_wait(&_ring, 1);
// 3. 处理所有 CQE
io_uring_for_each_cqe(&_ring, head, cqe) {
HandleCQE(cqe);
}
}
}
问题描述:
user_data 需要标识来源IOEventDataId,但 io_uring 需要区分 POLLIN 和 POLLOUT解决方案:指针编码 + 引用计数
// user_data 编码方案
// Bit 0: 0=Input Poll, 1=Output Poll
// Bits 1-63: PollContext* 指针(至少 8 字节对齐)
struct PollContext {
int fd;
IOEventDataId event_data_id;
butil::atomic<int> refcount; // 未完成的 poll 计数
bool input_poll_active;
bool output_poll_active;
bool removed; // 标记删除,refcount=0 时释放
};
// 编码
inline uint64_t EncodeUserData(PollContext* ctx, bool is_output) {
uint64_t ptr = reinterpret_cast<uint64_t>(ctx);
return ptr | (is_output ? 1ULL : 0ULL);
}
// 解码
inline PollContext* ExtractPollContext(uint64_t user_data) {
return reinterpret_cast<PollContext*>(user_data & ~1ULL);
}
inline bool IsOutputPoll(uint64_t user_data) {
return user_data & 1ULL;
}
生命周期管理:
// RemoveConsumer 时
ctx->removed = true;
// 不立即删除,等待所有 CQE 完成
// CQE 处理时
if (poll_done) {
ctx->refcount.fetch_sub(1);
if (ctx->removed && ctx->refcount.load() == 0) {
delete ctx; // 安全释放
}
}
问题描述:
解决方案:运行时检测 + 条件重注册
// 初始化时检测
bool SetupUring(struct io_uring* ring) {
if (kernel_ver >= 513) {
_use_multishot_poll = true;
}
// Poll Input 事件
void SubmitInputPoll(PollContext* ctx) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&_ring);
io_uring_prep_poll_add(sqe, ctx->fd, POLLIN);
if (_use_multishot_poll) {
sqe->len = IORING_POLL_ADD_MULTI; // Multishot 模式
}
io_uring_sqe_set_data64(sqe, EncodeUserData(ctx, false));
}
// CQE 处理
bool poll_done = is_output || res < 0 || !(flags & IORING_CQE_F_MORE);
if (poll_done && ctx->input_poll_active) {
SubmitInputPoll(ctx); // 仅在完成时重新提交
}
}
问题描述:
解决方案:Direct Ring Access 模式
// 检测 Direct 模式
bool EventDispatcher::Init() {
int ntags = FLAGS_task_group_ntags;
int ndisp = FLAGS_event_dispatcher_num;
// 条件:多 tag 且每个 tag 只有一个 dispatcher
_direct_ring_access = (ntags > 1) && (ndisp == 1);
}
// Queue 模式 vs Direct 模式
int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
if (_direct_ring_access) {
// 直接操作 ring,零开销
PollContext* ctx = _poll_context_map.Create(fd, event_data_id);
SubmitInputPoll(ctx);
io_uring_submit(&_ring);
return 0;
} else {
// 通过队列,线程安全
UringOp op{UringOpType::ADD_POLL, fd, event_data_id, true, nullptr};
_op_queue.Enqueue(op);
Wakeup();
WaitCompletion(); // 等待 Ring Owner 完成
return 0;
}
}
触发条件:
# 配置命令行参数
--task_group_ntags=4 # 多 tag
--event_dispatcher_num=1 # 每个 tag 一个 dispatcher
┌─────────────────────────────────────────────────────────────────┐
│ brpc Application Layer │
│ (Socket, RPC, etc.) │
└───────────────────────────────────┬─────────────────────────────┘
│
┌───────────────────────────────────▼─────────────────────────────┐
│ EventDispatcher (io_uring) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌─────────────────┐ ┌───────────────┐ │
│ │ pthread 1 │───▶│ MPSCQueue │───▶│ io_uring │ │
│ │ (bthreads) │ │ (UringOp) │ │ Ring │ │
│ └──────────────┘ │ │ │ │ │
│ ┌──────────────┐ │ Queue Mode │ │ │ │
│ │ pthread 2 │───▶│ │ │ │ │
│ │ (bthreads) │ └─────────────────┘ └───────┬───────┘ │
│ └──────────────┘ ▲ │ │
│ ┌──────────────┐ │ Direct Mode │ │
│ │ pthread 3 │──────────────┘ (条件触发) │ │
│ │ (bthreads) │ │ │
│ └──────────────┘ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Run() 事件循环 │ │
│ │ 1. DrainOpQueue() │ │
│ │ 2. io_uring_submit() │ │
│ │ 3. io_uring_wait_cqe() │ │
│ │ 4. 处理 CQE,调用回调 │ │
│ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
// 操作类型
enum class UringOpType : uint8_t {
ADD_POLL, // AddConsumer: 添加 fd 的 POLLIN 监听
ADD_OUTPUT, // RegisterEvent: 添加 POLLOUT 监听
REMOVE_OUTPUT, // UnregisterEvent: 移除 POLLOUT 监听
REMOVE_POLL, // RemoveConsumer: 移除 fd 的所有监听
STOP, // 停止 dispatcher
};
// Poll 上下文(每个 fd 一个)
struct PollContext {
int fd;
IOEventDataId event_data_id;
butil::atomic<int> refcount; // 引用计数
bool input_poll_active; // POLLIN 是否活跃
bool output_poll_active; // POLLOUT 是否活跃
bool removed; // 标记删除
};
// MPSC 队列
class UringOpQueue {
butil::MPSCQueue<UringOp> _queue;
};
// Poll 上下文映射
class UringPollContextMap {
std::unordered_map<int, PollContext*> _map;
};
| 特性 | epoll | io_uring |
|---|---|---|
| 系统调用 | 每次 epoll_wait | 批量提交 io_uring_submit |
| 事件模式 | 边缘触发 | Multishot/Oneshot |
| 线程模型 | 多线程安全 | 单线程 Ring Owner |
| 写就绪 | EPOLLOUT | POLLOUT oneshot |
| 性能优化 | 标准模式 | Direct Ring Access、COOP_TASKRUN |
| 内核版本 | 2.6+ | 5.1+ |
关键代码对比:
epoll 版本:
void EventDispatcher::Run() {
while (!_stop) {
epoll_event e[32];
int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
}
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
}
}
}
}
io_uring 版本:
void EventDispatcher::Run() {
// 注册 wakeup pipe
io_uring_prep_poll_add(wakeup_sqe, _wakeup_fds[0], POLLIN);
if (_use_multishot_poll) {
wakeup_sqe->len = IORING_POLL_ADD_MULTI;
}
io_uring_submit(&_ring);
while (!_stop) {
if (!_direct_ring_access) {
DrainOpQueue(); // 处理队列操作
}
// 优化:先 peek 检查
struct io_uring_cqe* cqe = nullptr;
int peek_ret = io_uring_peek_cqe(&_ring, &cqe);
if (peek_ret == -EAGAIN) {
io_uring_submit_and_wait(&_ring, 1);
} else if (peek_ret == 0) {
if (FLAGS_iouring_eager_submit ||
io_uring_sq_ready(&_ring) >= FLAGS_iouring_submit_batch) {
io_uring_submit(&_ring);
}
}
// 处理所有 CQE
io_uring_for_each_cqe(&_ring, head, cqe) {
uint64_t user_data = io_uring_cqe_get_data64(cqe);
int32_t res = cqe->res;
uint32_t flags = cqe->flags;
if (user_data == WAKEUP) {
drain_wakeup_pipe();
if (!_use_multishot_poll || !(flags & IORING_CQE_F_MORE)) {
resubmit_wakeup_poll();
}
} else {
PollContext* ctx = ExtractPollContext(user_data);
bool is_output = IsOutputPoll(user_data);
bool poll_done = is_output || res < 0 || !(flags & IORING_CQE_F_MORE);
if (poll_done) {
ctx->refcount.fetch_sub(1);
if (ctx->removed && ctx->refcount.load() == 0) {
delete ctx;
continue;
}
}
if (is_output) {
ctx->output_poll_active = false;
CallOutputEventCallback(ctx->event_data_id, res, _thread_attr);
} else {
CallInputEventCallback(ctx->event_data_id, res, _thread_attr);
if (poll_done && ctx->input_poll_active) {
SubmitInputPoll(ctx);
}
}
}
}
io_uring_cq_advance(&_ring, count);
if (!_direct_ring_access) {
DrainOpQueue();
}
}
}
目前实现仅支持网络 I/O(socket),未适配以下场景:
open、read、write、fsync 等操作原因:
poll_add 操作,易于映射readv、writev、fallocate 等更复杂的操作如需扩展支持,可参考以下 API:
// 磁盘读
io_uring_prep_read(sqe, fd, buf, len, offset);
// 磁盘写
io_uring_prep_write(sqe, fd, buf, len, offset);
// 异步 fsync
io_uring_prep_fsync(sqe, fd, 0);
使用 example/parallel_echo_c++ 示例进行基准测试:
# epoll 版本
./build-no-uring/output/bin/echo_server_epoll --task_group_ntags=4 --event_dispatcher_num=1
./build-no-uring/output/bin/rpc_press -qps 50000 -duration 10 -thread_num 4
# io_uring 版本
./build-iouring/output/bin/echo_server_iouring --task_group_ntags=4 --event_dispatcher_num=1
./build-iouring/output/bin/rpc_press -qps 50000 -duration 10 -thread_num 4
压测结果对比
============================================================
测试参数: QPS=50000, 时长=10s, 线程=4
指标 epoll io_uring 差异
--------------- ------------ ------------ ------------
请求数 500003 500002 -
错误数 0 0 -
平均延迟 18us 17us -5.6%
P50 延迟 18us 18us 0.0%
P99 延迟 32us 30us -6.2%
P99.99 延迟 637us 471us -26.1%
最大延迟 1077us 1042us -3.2%
结果分析:
当前性能提升有限的原因:
适用场景:
本次探索成功将 io_uring 集成到 brpc,实现了与 epoll 兼容的 API。主要成果包括:
虽然当前性能测试未展现显著优势,但 io_uring 的潜力在于:
未来可以考虑: