brpc 中引入 io_uring 的探索与实践

io_uring 是 Linux 内核 5.1 引入的一种新型异步 I/O 接口,旨在解决传统 epoll 在高并发场景下的性能瓶颈。最近我在 brpc 中尝试引入 io_uring 支持,在这个过程中遇到了不少坑,也积累了一些经验。本文记录了这次探索的完整过程,包括遇到的困难、最终的设计方案以及测试结果。

代码: https://github.com/duanfuxiang0/brpc

brpc 核心组件学习

在开始 io_uring 集成之前,我花了一些时间深入学习了 brpc 的核心组件。这部分内容是后续设计的基础。

bthread:用户态协程

bthread 是 brpc 实现的用户态协程库,它的调度模型很有意思:

// 示例:设置 task_group 的 tag 数量
--task_group_ntags=4  // 每个 task_group 有 4 个 tag

// 示例:限制 bthread 在特定 pthread 上运行
--event_dispatcher_num=1  // 每个 tag 只有一个 event_dispatcher

bvar:监控指标库

bvar 提供高效的指标统计能力:

在事件分发器中,我们使用 LatencyRecorder 记录读写延迟:

extern bvar::LatencyRecorder* g_edisp_read_lantency;
extern bvar::LatencyRecorder* g_edisp_write_lantency;

(*g_edisp_read_lantency) << latency_ns;

EventDispatcher:事件分发器

EventDispatcher 是 brpc 的 I/O 多路复用核心接口,负责:

原有实现基于 epoll,我们需要在此基础上增加 io_uring 支持。


io_uring 基础与 liburing 封装

io_uring 核心概念

io_uring 采用 Submission Queue (SQ) 和 Completion Queue (CQ) 的双队列架构:

┌─────────────┐         Submit          ┌─────────────┐
│  用户空间    │ ───────────────────────▶│   内核空间   │
│   SQE       │     mmap 共享内存       │    SQ       │
│   CQE       │◀─────────────────────── │    CQ       │
└─────────────┘         Complete        └─────────────┘

它的优势主要体现在:

liburing API 简介

liburing 是官方提供的 C 封装,核心 API 如下:

// 初始化 io_uring 实例
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);

// 获取下一个 SQE
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);

// 准备 poll 操作
void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask);

// 提交操作
int io_uring_submit(struct io_uring *ring);

// 等待至少一个完成
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

// 遍历所有就绪的 CQE
io_uring_for_each_cqe(ring, head, cqe) {
    // 处理完成事件
}

// 标记已处理的 CQE
void io_uring_cq_advance(struct io_uring *ring, unsigned nr);

多线程安全问题

重要发现:liburing 的 API 并非线程安全!多个线程并发调用 io_uring_submit 会导致数据竞争。

这给我们的设计带来了第一个挑战:如何让多线程安全地操作同一个 io_uring 实例?

遇到的困难与解决方案

在集成过程中遇到了不少困难,每个都花了不少时间才找到合适的解决方案。

多线程安全访问 io_uring

问题描述:

解决方案:Ring Owner 单线程模型

最终采用了单线程 Ring Owner 架构,所有 io_uring 操作由一个线程串行执行:

┌──────────────────────────────────────────────────────┐
│              EventDispatcher (Ring Owner)            │
├──────────────────────────────────────────────────────┤
│                                                      │
│  ┌─────────────┐    ┌──────────────────┐            │
│  │ Worker 1    │───▶│  MPSCQueue       │            │
│  │ (pthread)   │    │  (UringOp)       │───▶ Ring  │
│  └─────────────┘    └──────────────────┘            │
│  ┌─────────────┐           ▲                         │
│  │ Worker 2    │───────────┘                         │
│  │ (pthread)   │                                     │
│  └─────────────┘                                     │
│                                                       │
│  ┌──────────────────────────────────────────┐        │
│  │ Run() 事件循环                          │        │
│  │  1. DrainOpQueue() - 处理队列中的操作    │        │
│  │  2. io_uring_submit() - 提交 SQE        │        │
│  │  3. io_uring_wait_cqe() - 等待完成      │        │
│  │  4. 处理 CQE,调用回调                    │        │
│  └──────────────────────────────────────────┘        │
└──────────────────────────────────────────────────────┘

实现细节:

// 外部线程通过队列提交操作
struct UringOp {
    UringOpType type;  // ADD_POLL, ADD_OUTPUT, REMOVE_OUTPUT, REMOVE_POLL
    int fd;
    VRefId event_data_id;
    bool pollin;
    UringOpCompletion* completion;  // 可选,用于同步等待
};

// Ring Owner 线程的事件循环
void EventDispatcher::Run() {
    while (!_stop) {
        // 1. 处理队列中的操作
        DrainOpQueue();

        // 2. 提交并等待
        io_uring_submit_and_wait(&_ring, 1);

        // 3. 处理所有 CQE
        io_uring_for_each_cqe(&_ring, head, cqe) {
            HandleCQE(cqe);
        }
    }
}

user_data 的 O(1) 编码

问题描述:

解决方案:指针编码 + 引用计数

// user_data 编码方案
// Bit 0: 0=Input Poll, 1=Output Poll
// Bits 1-63: PollContext* 指针(至少 8 字节对齐)

struct PollContext {
    int fd;
    IOEventDataId event_data_id;
    butil::atomic<int> refcount;  // 未完成的 poll 计数
    bool input_poll_active;
    bool output_poll_active;
    bool removed;  // 标记删除,refcount=0 时释放
};

// 编码
inline uint64_t EncodeUserData(PollContext* ctx, bool is_output) {
    uint64_t ptr = reinterpret_cast<uint64_t>(ctx);
    return ptr | (is_output ? 1ULL : 0ULL);
}

// 解码
inline PollContext* ExtractPollContext(uint64_t user_data) {
    return reinterpret_cast<PollContext*>(user_data & ~1ULL);
}

inline bool IsOutputPoll(uint64_t user_data) {
    return user_data & 1ULL;
}

生命周期管理:

// RemoveConsumer 时
ctx->removed = true;
// 不立即删除,等待所有 CQE 完成

// CQE 处理时
if (poll_done) {
    ctx->refcount.fetch_sub(1);
    if (ctx->removed && ctx->refcount.load() == 0) {
        delete ctx;  // 安全释放
    }
}

Multishot Poll 的差异

问题描述:

解决方案:运行时检测 + 条件重注册

// 初始化时检测
bool SetupUring(struct io_uring* ring) {
    if (kernel_ver >= 513) {
        _use_multishot_poll = true;
    }

    // Poll Input 事件
    void SubmitInputPoll(PollContext* ctx) {
        struct io_uring_sqe* sqe = io_uring_get_sqe(&_ring);
        io_uring_prep_poll_add(sqe, ctx->fd, POLLIN);

        if (_use_multishot_poll) {
            sqe->len = IORING_POLL_ADD_MULTI;  // Multishot 模式
        }
        io_uring_sqe_set_data64(sqe, EncodeUserData(ctx, false));
    }

    // CQE 处理
    bool poll_done = is_output || res < 0 || !(flags & IORING_CQE_F_MORE);
    if (poll_done && ctx->input_poll_active) {
        SubmitInputPoll(ctx);  // 仅在完成时重新提交
    }
}

Direct Ring Access 的条件

问题描述:

解决方案:Direct Ring Access 模式

// 检测 Direct 模式
bool EventDispatcher::Init() {
    int ntags = FLAGS_task_group_ntags;
    int ndisp = FLAGS_event_dispatcher_num;

    // 条件:多 tag 且每个 tag 只有一个 dispatcher
    _direct_ring_access = (ntags > 1) && (ndisp == 1);
}

// Queue 模式 vs Direct 模式
int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
    if (_direct_ring_access) {
        // 直接操作 ring,零开销
        PollContext* ctx = _poll_context_map.Create(fd, event_data_id);
        SubmitInputPoll(ctx);
        io_uring_submit(&_ring);
        return 0;
    } else {
        // 通过队列,线程安全
        UringOp op{UringOpType::ADD_POLL, fd, event_data_id, true, nullptr};
        _op_queue.Enqueue(op);
        Wakeup();
        WaitCompletion();  // 等待 Ring Owner 完成
        return 0;
    }
}

触发条件:

# 配置命令行参数
--task_group_ntags=4       # 多 tag
--event_dispatcher_num=1   # 每个 tag 一个 dispatcher

最终设计

架构图

┌─────────────────────────────────────────────────────────────────┐
│                    brpc Application Layer                        │
│                    (Socket, RPC, etc.)                          │
└───────────────────────────────────┬─────────────────────────────┘
                                    │
┌───────────────────────────────────▼─────────────────────────────┐
│                EventDispatcher (io_uring)                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌─────────────────┐    ┌───────────────┐  │
│  │  pthread 1   │───▶│   MPSCQueue     │───▶│  io_uring     │  │
│  │  (bthreads)  │    │   (UringOp)     │    │    Ring       │  │
│  └──────────────┘    │                 │    │               │  │
│  ┌──────────────┐    │  Queue Mode     │    │               │  │
│  │  pthread 2   │───▶│                 │    │               │  │
│  │  (bthreads)  │    └─────────────────┘    └───────┬───────┘  │
│  └──────────────┘              ▲                    │           │
│  ┌──────────────┐              │   Direct Mode      │           │
│  │  pthread 3   │──────────────┘   (条件触发)       │           │
│  │  (bthreads)  │                                   │           │
│  └──────────────┘                                   ▼           │
│                              ┌──────────────────────────────┐   │
│                              │       Run() 事件循环          │   │
│                              │   1. DrainOpQueue()          │   │
│                              │   2. io_uring_submit()       │   │
│                              │   3. io_uring_wait_cqe()     │   │
│                              │   4. 处理 CQE,调用回调        │   │
│                              └──────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

核心数据结构

// 操作类型
enum class UringOpType : uint8_t {
    ADD_POLL,       // AddConsumer: 添加 fd 的 POLLIN 监听
    ADD_OUTPUT,     // RegisterEvent: 添加 POLLOUT 监听
    REMOVE_OUTPUT,  // UnregisterEvent: 移除 POLLOUT 监听
    REMOVE_POLL,    // RemoveConsumer: 移除 fd 的所有监听
    STOP,           // 停止 dispatcher
};

// Poll 上下文(每个 fd 一个)
struct PollContext {
    int fd;
    IOEventDataId event_data_id;
    butil::atomic<int> refcount;  // 引用计数
    bool input_poll_active;      // POLLIN 是否活跃
    bool output_poll_active;     // POLLOUT 是否活跃
    bool removed;                // 标记删除
};

// MPSC 队列
class UringOpQueue {
    butil::MPSCQueue<UringOp> _queue;
};

// Poll 上下文映射
class UringPollContextMap {
    std::unordered_map<int, PollContext*> _map;
};

与 epoll 的主要区别

特性epollio_uring
系统调用每次 epoll_wait批量提交 io_uring_submit
事件模式边缘触发Multishot/Oneshot
线程模型多线程安全单线程 Ring Owner
写就绪EPOLLOUTPOLLOUT oneshot
性能优化标准模式Direct Ring Access、COOP_TASKRUN
内核版本2.6+5.1+

关键代码对比:

epoll 版本:

void EventDispatcher::Run() {
    while (!_stop) {
        epoll_event e[32];
        int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);

        for (int i = 0; i < n; ++i) {
            if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
                CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
            }
            if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
                CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr);
            }
        }
    }
}

io_uring 版本:

void EventDispatcher::Run() {
    // 注册 wakeup pipe
    io_uring_prep_poll_add(wakeup_sqe, _wakeup_fds[0], POLLIN);
    if (_use_multishot_poll) {
        wakeup_sqe->len = IORING_POLL_ADD_MULTI;
    }
    io_uring_submit(&_ring);

    while (!_stop) {
        if (!_direct_ring_access) {
            DrainOpQueue();  // 处理队列操作
        }

        // 优化:先 peek 检查
        struct io_uring_cqe* cqe = nullptr;
        int peek_ret = io_uring_peek_cqe(&_ring, &cqe);

        if (peek_ret == -EAGAIN) {
            io_uring_submit_and_wait(&_ring, 1);
        } else if (peek_ret == 0) {
            if (FLAGS_iouring_eager_submit ||
                io_uring_sq_ready(&_ring) >= FLAGS_iouring_submit_batch) {
                io_uring_submit(&_ring);
            }
        }

        // 处理所有 CQE
        io_uring_for_each_cqe(&_ring, head, cqe) {
            uint64_t user_data = io_uring_cqe_get_data64(cqe);
            int32_t res = cqe->res;
            uint32_t flags = cqe->flags;

            if (user_data == WAKEUP) {
                drain_wakeup_pipe();
                if (!_use_multishot_poll || !(flags & IORING_CQE_F_MORE)) {
                    resubmit_wakeup_poll();
                }
            } else {
                PollContext* ctx = ExtractPollContext(user_data);
                bool is_output = IsOutputPoll(user_data);

                bool poll_done = is_output || res < 0 || !(flags & IORING_CQE_F_MORE);
                if (poll_done) {
                    ctx->refcount.fetch_sub(1);
                    if (ctx->removed && ctx->refcount.load() == 0) {
                        delete ctx;
                        continue;
                    }
                }

                if (is_output) {
                    ctx->output_poll_active = false;
                    CallOutputEventCallback(ctx->event_data_id, res, _thread_attr);
                } else {
                    CallInputEventCallback(ctx->event_data_id, res, _thread_attr);
                    if (poll_done && ctx->input_poll_active) {
                        SubmitInputPoll(ctx);
                    }
                }
            }
        }
        io_uring_cq_advance(&_ring, count);

        if (!_direct_ring_access) {
            DrainOpQueue();
        }
    }
}

局限性

仅支持网络 I/O

目前实现仅支持网络 I/O(socket),未适配以下场景:

原因:

未来工作

如需扩展支持,可参考以下 API:

// 磁盘读
io_uring_prep_read(sqe, fd, buf, len, offset);

// 磁盘写
io_uring_prep_write(sqe, fd, buf, len, offset);

// 异步 fsync
io_uring_prep_fsync(sqe, fd, 0);

测试说明

测试环境

测试方法

使用 example/parallel_echo_c++ 示例进行基准测试:

# epoll 版本
./build-no-uring/output/bin/echo_server_epoll --task_group_ntags=4 --event_dispatcher_num=1
./build-no-uring/output/bin/rpc_press -qps 50000 -duration 10 -thread_num 4

# io_uring 版本
./build-iouring/output/bin/echo_server_iouring --task_group_ntags=4 --event_dispatcher_num=1
./build-iouring/output/bin/rpc_press -qps 50000 -duration 10 -thread_num 4

测试结果

压测结果对比
============================================================

测试参数: QPS=50000, 时长=10s, 线程=4

指标                 epoll     io_uring       差异
--------------- ------------ ------------ ------------
请求数             500003       500002            -
错误数                  0            0            -
平均延迟            18us         17us        -5.6%
P50 延迟              18us         18us         0.0%
P99 延迟              32us         30us        -6.2%
P99.99 延迟          637us        471us       -26.1%
最大延迟          1077us       1042us        -3.2%

结果分析:

结论与注意事项

当前性能提升有限的原因:

  1. 测试场景简单:echo 服务计算开销小,网络 I/O 非主导
  2. Direct 模式未充分利用:在测试配置下可能未触发 Direct Ring Access
  3. 队列开销:Queue 模式仍有 MPSC 队列开销
  4. 内核优化空间:COOP_TASKRUN、DEFER_TASKRUN 的效果在高负载下更明显

适用场景:


总结

本次探索成功将 io_uring 集成到 brpc,实现了与 epoll 兼容的 API。主要成果包括:

  1. 单线程 Ring Owner 架构:解决了 liburing 的线程安全问题
  2. Direct Ring Access 模式:在特定配置下零队列开销
  3. O(1) user_data 编码:通过指针编码和引用计数实现高效查找
  4. Multishot Poll 支持:自动检测内核版本并适配
  5. 优雅降级机制:高级特性不可用时自动回退

虽然当前性能测试未展现显著优势,但 io_uring 的潜力在于:

未来可以考虑:


参考资料

关于图数据库的一些思考
io_uring 学习总结