PGLite-Native: 进程内的 PostgreSQL

整体写的比较乱, 可以直接看源码比较好, 源码:

我一直使用 PGLite(Pgvector), 它可以把完整的 PostgreSQL 嵌入到浏览器环境; 让我可以使用 pgvector 等等环境, 目前嵌入式的向量查询, 这几乎是最好的方式, 但是就是只能在浏览器环境使用, 如果想嵌入到其他类型的app, 无法实现, 官方一直说会有一个 原生版本,但是还是一直没有去实现, 我认为这应该不是一个很难的问题, PGLite-Native 就是基于原来的 PGLite-wasm 版本做一个navtive 环境的适配, 目前只支持(linux/mac)环境, windows 应该也可以支持, 不过我没有测试

PostgreSQL 单用户模式

PostgreSQL 单用户模式是一种特殊的运行方式,它不启动 postmaster(主服务器进程),直接运行一个后端进程, 不监听 TCP/IP 或 Unix socket, 同时只能有一个连接, 数据库修复、维护、等等; PGlite 完全基于 PostgresSingleUserMain 这种访问模式, 通过一些 hack 手段, 模拟出一个类似 sqlite/duckdb 这种访问接口来

...
else if (argc > 1 && strcmp(argv[1], "--single") == 0)
    PostgresSingleUserMain(argc, argv,
                          strdup(get_user_name_or_exit(progname)));
...
void PostgresSingleUserMain(int argc, char *argv[], const char *username)
{
    Assert(!IsUnderPostmaster);          // 确保不在 postmaster 下
    InitStandaloneProcess(argv[0]);      // 初始化独立进程环境
    InitializeGUCOptions();              // 初始化 GUC 配置
    process_postgres_switches();         // 解析命令行参数
    SelectConfigFiles();                 // 加载配置文件
    checkDataDir();                      // 验证数据目录
    ChangeToDataDir();                   // 切换到数据目录
    CreateDataDirLockFile(false);        // 创建数据目录锁文件
    LocalProcessControlFile(false);      // 读取控制文件
    process_shared_preload_libraries();  // 加载预加载库
    InitializeMaxBackends();             // 初始化最大连接数
    process_shmem_requests();            // 处理共享内存请求
    InitializeShmemGUCs();               // 初始化共享内存 GUC
    CreateSharedMemoryAndSemaphores();   // 创建共享内存和信号量
    InitProcess();                       // 初始化 PGPROC 结构
    PostgresMain(dbname, username);      // ** 进入主循环 **
}

PostgresMain() 处理 SQL 查询, 在单用户模式下,输入来自 stdin 而非网络连接, pglite 这里使用临时文件模拟 stdin, 从而实现了稳定查询访问; 进入到 主循环 后续的流程基本跟原始的 postgresql 一样了; 所以主要工作就是 hack 前面这些启动流程, 包装出一个类似 sqlite 的嵌入式数据库的接口

Pglite 的 Hack

标准启动流程这样的, postmaster 负责创建 backend 处理请求,

initdb (创建数据目录)
  ├─ fork → postgres --boot (创建系统表)
  └─ fork → postgres --single (初始化模板库等等)
       ↓
postmaster (主进程)
  └─ fork → backend (后端进程) × N (处理客户端连接)

这里启动的 single 可以实现 从 stdin 读取 SQL, 输出到 stdout,无法返回到内存, 并且只能有一个”连接”

// src/bin/initdb/initdb.c
#define PG_CMD_OPEN(cmd) \
    do { \
        cmdfd = popen_check(cmd, "w");  // fork + exec postgres --boot
        if (cmdfd == NULL) \
            exit(1); \
    } while (0)
#define PG_CMD_PUTS(line) \
    do { \
        if (fputs(line, cmdfd) < 0 || fflush(cmdfd) < 0) \
            exit(1); \
    } while (0)
PG_CMD_OPEN("postgres --boot -D /data");
PG_CMD_PUTS("CREATE TABLE pg_class (...);");
PG_CMD_PUTS("INSERT INTO pg_class VALUES (...);");
PG_CMD_CLOSE();

我们想要的是一个类似 sqlite 那种像 SQLite 一样:可以编译为一个库,通过 C API 调用, 支持多个”连接”, 错误不能像postgresql那样直接退出进程, 并且查询结果返回到内存,而不是stdout

将 initdb fork(popen single) 转换为文件管道

// pgl_os.h
FILE *pgl_popen(const char *command, const char *type) {
    if (IDB_STAGE == 0) {
        // Boot 模式:写入到文件
        IDB_PIPE_FP = fopen(IDB_PIPE_BOOT, "w");
        IDB_STAGE = 1;
    } else if (IDB_STAGE == 1) {
        // Single 模式:写入到另一个文件
        IDB_PIPE_FP = fopen(IDB_PIPE_SINGLE, "w");
        IDB_STAGE = 2;
    }
    return IDB_PIPE_FP;
#define popen(command, mode) pgl_popen(command, mode)

让启动流程就变成 initdb → pgl_popen() → fopen(“initdb.boot.txt”) ↓ 写入 SQL 到文件(不是子进程) ↓ 后续重新打开文件,作为 postgres –single 的 stdin

// pg_main.c
if (setjmp(initdb_exit_jmp) == 0) {
    PDEBUG("# restarting in boot mode for initdb");
    
    // 将 popen 输出重新作为 stdin 输入
    if (freopen(IDB_PIPE_BOOT, "r", stdin) == NULL) {
        fprintf(stderr, "Failed to reopen stdin\n");
    }
    
    char *boot_argv[] = {
        g_argv[0],
        "--boot",
        "-D", (char *)PGDATA,
        "-d", "3",
        (char *)WASM_PGOPTS,
        "-X", "1048576",
        NULL
    };
    
    BootstrapModeMain(boot_argc, boot_argv, false);
    fclose(stdin);
    remove(IDB_PIPE_BOOT);  // 清理临时文件...
}

错误处理, 将 proc_exit() 转换为 longjmp

initdb 和 postgres 在完成/出错时调用 proc_exit(66) 退出进程, 劫持之后, 目的是返回错误, 并且继续执行后续代码, 而不是直接关闭进程

// pg_main.c:
static jmp_buf initdb_exit_jmp;
static volatile int initdb_exit_code = 0;
int pgl_initdb() {
    // 第一次 setjmp:正常返回
    if (setjmp(initdb_exit_jmp) == 0) {
        pgl_initdb_main();  // 内部会调用 pg_proc_exit(66)
    } else {
        // 从 pg_proc_exit 的 longjmp 返回到这里
        fprintf(stderr, "# INFO: pg_proc_exit(%d) caught\n", 
                (int)initdb_exit_code);
    }
    
    // 第二次 setjmp:启动 boot 模式
    if (setjmp(initdb_exit_jmp) == 0) {
        // ... 启动 boot 模式 ...
        BootstrapModeMain(boot_argc, boot_argv, false);
        
        // boot 模式完成,主动 longjmp(而不是 exit)
        initdb_exit_code = 66;
        longjmp(initdb_exit_jmp, 1);  // 跳转到上面的 else 分支
    } else {
        // Boot 完成
        pgl_idb_status |= IDB_HASDB;
        pgl_idb_status |= IDB_HASUSER;
    }
}

initdb → pg_proc_exit(66) → longjmp(initdb_exit_jmp) ↓ 继续执行后续代码(boot 模式)

将 stdin/stdout 转换为临时文件流

signle 模式从 stdin 读取 SQL,输出到 stdout, 这里我们通过一个翻译反复倒带临时文件模拟输入输出接口,来实现用户多次交互

// pgl_capi.c:
static bool write_sql_to_stream(FILE *stream, const char *sql) {
    size_t len = strlen(sql);
    if (fwrite(sql, 1, len, stream) != len)
        return false;
    
    if (len == 0 || sql[len - 1] != '\n') {
        if (fputc('\n', stream) == EOF)
            return false;
    }
    
    fflush(stream);
    rewind(stream);  // 回到文件开头!
    return true;
}
pglite_state pglite_run_sql_with_hook(
    pglite_connection conn, const char *sql,
    PgliteProtoHook *hook, char **out_error)
{
    FILE *stream = tmpfile();  // 创建临时文件(自动删除)
    
    // 第一步:写入 SQL
    if (!write_sql_to_stream(stream, sql)) {
        fclose(stream);
        return PGLITE_ERROR;
    }
    
    // 第二步:将临时文件作为 stdin 传给单用户模式
    int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
    
    fclose(stream);  // 自动删除
    return (rc == 0) ? PGLITE_SUCCESS : PGLITE_ERROR;
}

pglite_query(conn, “SELECT * FROM pg_class”) ↓ 创建临时文件 ↓ 写入 SQL + rewind() ↓ 将文件指针传给 postgres –single(作为 stdin) ↓ postgres 从文件读取 SQL ↓ 结果通过 hook 返回到内存

将 libpq 协议拦截到内存

在正常模式下,PostgreSQL 通过 libpq 协议发送结果:后端 → libpq → 网络协议 → 客户端 在单用户模式下:后端 → printf() → stdout 所以这里需要伪装 postmaster 环境 + hook 协议

// pg_main.c:263
backend_started:;
    IsPostmasterEnvironment = true;  // 关键!
// pgl_interactive_one.c:83-103
ClientSocket dummy_sock;  // 空结构体
static void io_init(bool in_auth, bool out_auth) {
    ClientAuthInProgress = in_auth;
#ifdef PG16
    pq_init();
    MyProcPort = (Port *) calloc(1, sizeof(Port));  // 分配假 Port
#else
    MyProcPort = pq_init(&dummy_sock);  // 传入假 socket
#endif
    whereToSendOutput = DestRemote;  // 让后端认为输出到远程客户端
    if (!MyProcPort) {
        abort();
    }
...
}
// src/include/pglite/result_hook.h
typedef struct PgliteProtoHook {
    void (*on_row_description)(void *arg, int field_count, const PgliteResultField *fields);
    void (*on_data_row)(void *arg, int field_count, const char *const *values,
                        const int *lengths, const int16_t *formats);
    void (*on_command_complete)(void *arg, const char *command_tag);
    void (*on_error_message)(void *arg, const char *message);
    void *arg;
} PgliteProtoHook;

到这里, 我们就能处理一个完整 sql 请求了

// pgl_capi.c
pglite_state pglite_run_sql_with_hook(...) {
    FILE *stream = tmpfile();
    PG_TRY();
    {
        // 安装 hook
        if (hook)
            pglite_proto_install_hook(hook);
        // 执行 SQL
        int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
        // 清理 hook
        if (hook)
            pglite_proto_clear_hook(hook);
    }
    PG_CATCH();
    {
        // 错误处理
        ...
    }
    PG_END_TRY();
    
    fclose(stream);
    return state;
}

“连接”复用

标准模式: 客户端连接 A → fork 后端进程 A 客户端连接 B → fork 后端进程 B

单用户模式: 客户端连接 A → 后端进程(只有一个) 客户端连接 B → 后端进程(同上,但连接状态会被覆盖)

所以我们保证安全的同时, 通过复用这个链接,来处理多次请求, 我们只初始化一次(单进程), 每次 API 调用传入新的 SQL 流, 后端处理完 SQL 后回到就绪状态,等待下一个 SQL 流,多个”连接”实际上是同一个后端的不同会话:

conn1 = pglite_connect(db)  
pglite_query(conn1, "SELECT 1")  → 初始化后端(第一次)
    ↓
后端状态:已初始化,处理 SQL
    ↓
conn2 = pglite_connect(db)
pglite_query(conn2, "SELECT 2")  → 复用后端,处理新 SQL
    ↓
后端状态:仍在运行,处理新 SQL
// pgl_mains.c
static int
pgl_run_single_user_session_internal(int single_argc, char *single_argv[],
                                     const char *username, FILE *stream,
                                     bool close_stream)
{
    static bool single_session_initialized = false;  // 关键:只初始化一次
    
    FILE *feed = stream;
    bool should_close = close_stream;
    
    if (feed == NULL) {
        feed = fopen(IDB_PIPE_SINGLE, "r");
        should_close = true;
    }
    
    if (feed == NULL) {
        ereport(ERROR, (errmsg("failed to open single-user SQL feed")));
        return -1;
    }
    
    // Heavy-weight backend initialization. Run this only once per process;
    // subsequent calls will just reuse the existing backend and process
    // additional SQL from the provided stream.
    if (!single_session_initialized) {
        // 初始化 GUC、共享内存、进程结构等
        process_postgres_switches(single_argc, single_argv, PGC_POSTMASTER, &dbname);
        LocalProcessControlFile(false);
        process_shared_preload_libraries();
        IgnoreSystemIndexes = false;
        process_shmem_requests();
        InitializeShmemGUCs();
        InitializeWalConsistencyChecking();
        PgStartTime = GetCurrentTimestamp();
        SetProcessingMode(InitProcessing);
        InitPostgres(dbname, InvalidOid, username, InvalidOid,
                     (!am_walsender) ? INIT_PG_LOAD_SESSION_LIBS : 0,
                     NULL);
        SetProcessingMode(NormalProcessing);
        BeginReportingGUCOptions();
        pgstat_report_connect(MyDatabaseId);
        
        // 创建 MessageContext
        MessageContext = AllocSetContextCreate(TopMemoryContext,
                                               "MessageContext",
                                               ALLOCSET_DEFAULT_SIZES);
        
        // 设置异常处理
        #define INITDB_SINGLE
        #include "pgl_sjlj.c"
        #undef INITDB_SINGLE
        
        single_session_initialized = true;
    }
    
    if (!ignore_till_sync)
        send_ready_for_query = true;
    
    FILE *prev_feed = single_mode_feed;
    single_mode_feed = feed;
    pgl_process_sql_stream(feed);  // 处理新的 SQL 流
    single_mode_feed = prev_feed;
    
    if (should_close && feed)
        fclose(feed);
    
    return 0;
}

最后一步, 就是让整个流程, 一直循环处理

single 模式是处理完直接退出,而我们想要这个进程一起启动处理请求, 第一次调用: pglite_query(conn, “SELECT 1”) ↓ tmpfile() → /tmp/tmpXXXXXX ↓ 写入 “SELECT 1\n” → rewind() ↓ postgres 从文件读取 → 处理 → EOF → 回到就绪 ↓ fclose(stream) → 文件被删除 第二次调用: pglite_query(conn, “SELECT 2”) ↓ tmpfile() → /tmp/tmpYYYYYY (新的文件) ↓ 写入 “SELECT 2\n” → rewind() ↓ postgres 从文件读取 → 处理 → EOF → 回到就绪 ↓ fclose(stream) → 文件被删除

static pglite_state
pglite_run_sql_with_hook(pglite_connection conn, const char *sql,
                         PgliteProtoHook *hook, char **out_error)
{
    FILE *stream = tmpfile();  // 每次创建新的临时文件
    if (!write_sql_to_stream(stream, sql)) {
        fclose(stream);
        return PGLITE_ERROR;
    }
    PG_TRY();
    {
        if (hook)
            pglite_proto_install_hook(hook);
        // 每次都是新的 SQL 流
        int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
        state = (rc == 0) ? PGLITE_SUCCESS : PGLITE_ERROR;
        if (hook)
            pglite_proto_clear_hook(hook);
    }
    PG_CATCH();
    {
        // 错误处理
        ErrorData *edata = CopyErrorData();
        if (hook)
            pglite_proto_clear_hook(hook);
        
        if (out_error && edata && edata->message)
            *out_error = MemoryContextStrdup(pglite_api_context(), edata->message);
        
        FlushErrorState();
        FreeErrorData(edata);
        state = PGLITE_ERROR;
    }
    PG_END_TRY();
    fclose(stream);  // 关闭临时文件
    return state;
}

目前的整体处理流程非常原始, 没有做很多优化, 只支持 C API, 不过可以用在一些测试流程使用时没有问题的

整体流程栈图

1. 应用启动
   pglite_open("/data", &db)
   ↓
2. 初始化 PostgreSQL 后端
   pgl_bootstrap_engine("/data")
   ├─ pgl_native_startup()
   │  └─ 设置环境变量(PREFIX, PGDATA等)
   └─ pgl_native_initdb_if_needed()
      ├─ 检查数据库是否存在
      ├─ 如果不存在:
      │  └─ pgl_initdb()
      │     ├─ setjmp(initdb_exit_jmp)
      │     ├─ pgl_initdb_main()
      │     │  └─ 劫持 popen() → 写入到 IDB_PIPE_BOOT
      │     ├─ longjmp(initdb_exit_jmp)  ← pg_proc_exit
      │     ├─ setjmp(initdb_exit_jmp)
      │     ├─ freopen(IDB_PIPE_BOOT, "r", stdin)
      │     ├─ BootstrapModeMain()
      │     └─ longjmp(initdb_exit_jmp)  ← 完成
      └─ 标记:已初始化(single_session_initialized)
   ↓
3. 创建连接
   pglite_connect(db, &conn)
   └─ 创建连接对象(内存结构)
   ↓
4. 执行查询(每次 API 调用)
   pglite_query(conn, "SELECT * FROM pg_class")
   ├─ tmpfile() → 创建临时文件
   ├─ write_sql_to_stream()
   │  └─ 写入 SQL + rewind()
   ├─ pgl_run_sql_stream(stream, dbname, username)
   │  └─ pgl_run_single_user_session_internal()
   │     ├─ single_session_initialized?
   │     │  └─ 否:初始化后端(只执行一次)
   │     │     ├─ AsyncPostgresSingleUserMain()
   │     │     ├─ IsPostmasterEnvironment = true
   │     │     └─ io_init() → 伪造 MyProcPort
   │     ├─ sigsetjmp(local_sigjmp_buf)
   │     ├─ pgl_process_sql_stream()
   │     │  └─ 读取临时文件中的 SQL
   │     │     └─ #include "pg_proto.c"
   │     │        ├─ case 'Q': exec_simple_query()
   │     │        ├─ case 'P': exec_parse_message()
   │     │        └─ case 'B': exec_bind_message()
   │     │           ↓
   │     │        PostgreSQL 执行查询
   │     │           ↓
   │     │        pglite_proto_hook.on_row_description
   │     │        pglite_proto_hook.on_data_row
   │     │           ↓
   │     │        结果收集到内存(pglite_result)
   │     │     └─ ereport(ERROR) ?
   │     │        ├─ 是 → siglongjmp(local_sigjmp_buf)
   │     │        └─ 否 → goto wire_flush
   │     └─ 如果有 longjmp:
   │        └─ clear_error() → 清理状态
   ├─ fclose(stream) → 删除临时文件
   └─ 返回 pglite_result
   ↓
5. 关闭数据库
   pglite_close(&db)
   └─ 释放内存结构
挣扎,再挣扎
[Orbit Man] AI Agent 管理器