整体写的比较乱, 可以直接看源码比较好, 源码:
我一直使用 PGLite(Pgvector), 它可以把完整的 PostgreSQL 嵌入到浏览器环境; 让我可以使用 pgvector 等等环境, 目前嵌入式的向量查询, 这几乎是最好的方式, 但是就是只能在浏览器环境使用, 如果想嵌入到其他类型的app, 无法实现, 官方一直说会有一个 原生版本,但是还是一直没有去实现, 我认为这应该不是一个很难的问题, PGLite-Native 就是基于原来的 PGLite-wasm 版本做一个navtive 环境的适配, 目前只支持(linux/mac)环境, windows 应该也可以支持, 不过我没有测试
PostgreSQL 单用户模式是一种特殊的运行方式,它不启动 postmaster(主服务器进程),直接运行一个后端进程, 不监听 TCP/IP 或 Unix socket, 同时只能有一个连接, 数据库修复、维护、等等; PGlite 完全基于 PostgresSingleUserMain 这种访问模式, 通过一些 hack 手段, 模拟出一个类似 sqlite/duckdb 这种访问接口来
...
else if (argc > 1 && strcmp(argv[1], "--single") == 0)
PostgresSingleUserMain(argc, argv,
strdup(get_user_name_or_exit(progname)));
...
void PostgresSingleUserMain(int argc, char *argv[], const char *username)
{
Assert(!IsUnderPostmaster); // 确保不在 postmaster 下
InitStandaloneProcess(argv[0]); // 初始化独立进程环境
InitializeGUCOptions(); // 初始化 GUC 配置
process_postgres_switches(); // 解析命令行参数
SelectConfigFiles(); // 加载配置文件
checkDataDir(); // 验证数据目录
ChangeToDataDir(); // 切换到数据目录
CreateDataDirLockFile(false); // 创建数据目录锁文件
LocalProcessControlFile(false); // 读取控制文件
process_shared_preload_libraries(); // 加载预加载库
InitializeMaxBackends(); // 初始化最大连接数
process_shmem_requests(); // 处理共享内存请求
InitializeShmemGUCs(); // 初始化共享内存 GUC
CreateSharedMemoryAndSemaphores(); // 创建共享内存和信号量
InitProcess(); // 初始化 PGPROC 结构
PostgresMain(dbname, username); // ** 进入主循环 **
}
PostgresMain() 处理 SQL 查询, 在单用户模式下,输入来自 stdin 而非网络连接, pglite 这里使用临时文件模拟 stdin, 从而实现了稳定查询访问; 进入到 主循环 后续的流程基本跟原始的 postgresql 一样了; 所以主要工作就是 hack 前面这些启动流程, 包装出一个类似 sqlite 的嵌入式数据库的接口
标准启动流程这样的, postmaster 负责创建 backend 处理请求,
initdb (创建数据目录)
├─ fork → postgres --boot (创建系统表)
└─ fork → postgres --single (初始化模板库等等)
↓
postmaster (主进程)
└─ fork → backend (后端进程) × N (处理客户端连接)
这里启动的 single 可以实现 从 stdin 读取 SQL, 输出到 stdout,无法返回到内存, 并且只能有一个”连接”
// src/bin/initdb/initdb.c
#define PG_CMD_OPEN(cmd) \
do { \
cmdfd = popen_check(cmd, "w"); // fork + exec postgres --boot
if (cmdfd == NULL) \
exit(1); \
} while (0)
#define PG_CMD_PUTS(line) \
do { \
if (fputs(line, cmdfd) < 0 || fflush(cmdfd) < 0) \
exit(1); \
} while (0)
PG_CMD_OPEN("postgres --boot -D /data");
PG_CMD_PUTS("CREATE TABLE pg_class (...);");
PG_CMD_PUTS("INSERT INTO pg_class VALUES (...);");
PG_CMD_CLOSE();
我们想要的是一个类似 sqlite 那种像 SQLite 一样:可以编译为一个库,通过 C API 调用, 支持多个”连接”, 错误不能像postgresql那样直接退出进程, 并且查询结果返回到内存,而不是stdout
// pgl_os.h
FILE *pgl_popen(const char *command, const char *type) {
if (IDB_STAGE == 0) {
// Boot 模式:写入到文件
IDB_PIPE_FP = fopen(IDB_PIPE_BOOT, "w");
IDB_STAGE = 1;
} else if (IDB_STAGE == 1) {
// Single 模式:写入到另一个文件
IDB_PIPE_FP = fopen(IDB_PIPE_SINGLE, "w");
IDB_STAGE = 2;
}
return IDB_PIPE_FP;
#define popen(command, mode) pgl_popen(command, mode)
让启动流程就变成 initdb → pgl_popen() → fopen(“initdb.boot.txt”) ↓ 写入 SQL 到文件(不是子进程) ↓ 后续重新打开文件,作为 postgres –single 的 stdin
// pg_main.c
if (setjmp(initdb_exit_jmp) == 0) {
PDEBUG("# restarting in boot mode for initdb");
// 将 popen 输出重新作为 stdin 输入
if (freopen(IDB_PIPE_BOOT, "r", stdin) == NULL) {
fprintf(stderr, "Failed to reopen stdin\n");
}
char *boot_argv[] = {
g_argv[0],
"--boot",
"-D", (char *)PGDATA,
"-d", "3",
(char *)WASM_PGOPTS,
"-X", "1048576",
NULL
};
BootstrapModeMain(boot_argc, boot_argv, false);
fclose(stdin);
remove(IDB_PIPE_BOOT); // 清理临时文件...
}
initdb 和 postgres 在完成/出错时调用 proc_exit(66) 退出进程, 劫持之后, 目的是返回错误, 并且继续执行后续代码, 而不是直接关闭进程
// pg_main.c:
static jmp_buf initdb_exit_jmp;
static volatile int initdb_exit_code = 0;
int pgl_initdb() {
// 第一次 setjmp:正常返回
if (setjmp(initdb_exit_jmp) == 0) {
pgl_initdb_main(); // 内部会调用 pg_proc_exit(66)
} else {
// 从 pg_proc_exit 的 longjmp 返回到这里
fprintf(stderr, "# INFO: pg_proc_exit(%d) caught\n",
(int)initdb_exit_code);
}
// 第二次 setjmp:启动 boot 模式
if (setjmp(initdb_exit_jmp) == 0) {
// ... 启动 boot 模式 ...
BootstrapModeMain(boot_argc, boot_argv, false);
// boot 模式完成,主动 longjmp(而不是 exit)
initdb_exit_code = 66;
longjmp(initdb_exit_jmp, 1); // 跳转到上面的 else 分支
} else {
// Boot 完成
pgl_idb_status |= IDB_HASDB;
pgl_idb_status |= IDB_HASUSER;
}
}
initdb → pg_proc_exit(66) → longjmp(initdb_exit_jmp) ↓ 继续执行后续代码(boot 模式)
signle 模式从 stdin 读取 SQL,输出到 stdout, 这里我们通过一个翻译反复倒带临时文件模拟输入输出接口,来实现用户多次交互
// pgl_capi.c:
static bool write_sql_to_stream(FILE *stream, const char *sql) {
size_t len = strlen(sql);
if (fwrite(sql, 1, len, stream) != len)
return false;
if (len == 0 || sql[len - 1] != '\n') {
if (fputc('\n', stream) == EOF)
return false;
}
fflush(stream);
rewind(stream); // 回到文件开头!
return true;
}
pglite_state pglite_run_sql_with_hook(
pglite_connection conn, const char *sql,
PgliteProtoHook *hook, char **out_error)
{
FILE *stream = tmpfile(); // 创建临时文件(自动删除)
// 第一步:写入 SQL
if (!write_sql_to_stream(stream, sql)) {
fclose(stream);
return PGLITE_ERROR;
}
// 第二步:将临时文件作为 stdin 传给单用户模式
int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
fclose(stream); // 自动删除
return (rc == 0) ? PGLITE_SUCCESS : PGLITE_ERROR;
}
pglite_query(conn, “SELECT * FROM pg_class”) ↓ 创建临时文件 ↓ 写入 SQL + rewind() ↓ 将文件指针传给 postgres –single(作为 stdin) ↓ postgres 从文件读取 SQL ↓ 结果通过 hook 返回到内存
在正常模式下,PostgreSQL 通过 libpq 协议发送结果:后端 → libpq → 网络协议 → 客户端 在单用户模式下:后端 → printf() → stdout 所以这里需要伪装 postmaster 环境 + hook 协议
// pg_main.c:263
backend_started:;
IsPostmasterEnvironment = true; // 关键!
// pgl_interactive_one.c:83-103
ClientSocket dummy_sock; // 空结构体
static void io_init(bool in_auth, bool out_auth) {
ClientAuthInProgress = in_auth;
#ifdef PG16
pq_init();
MyProcPort = (Port *) calloc(1, sizeof(Port)); // 分配假 Port
#else
MyProcPort = pq_init(&dummy_sock); // 传入假 socket
#endif
whereToSendOutput = DestRemote; // 让后端认为输出到远程客户端
if (!MyProcPort) {
abort();
}
...
}
// src/include/pglite/result_hook.h
typedef struct PgliteProtoHook {
void (*on_row_description)(void *arg, int field_count, const PgliteResultField *fields);
void (*on_data_row)(void *arg, int field_count, const char *const *values,
const int *lengths, const int16_t *formats);
void (*on_command_complete)(void *arg, const char *command_tag);
void (*on_error_message)(void *arg, const char *message);
void *arg;
} PgliteProtoHook;
到这里, 我们就能处理一个完整 sql 请求了
// pgl_capi.c
pglite_state pglite_run_sql_with_hook(...) {
FILE *stream = tmpfile();
PG_TRY();
{
// 安装 hook
if (hook)
pglite_proto_install_hook(hook);
// 执行 SQL
int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
// 清理 hook
if (hook)
pglite_proto_clear_hook(hook);
}
PG_CATCH();
{
// 错误处理
...
}
PG_END_TRY();
fclose(stream);
return state;
}
标准模式: 客户端连接 A → fork 后端进程 A 客户端连接 B → fork 后端进程 B
单用户模式: 客户端连接 A → 后端进程(只有一个) 客户端连接 B → 后端进程(同上,但连接状态会被覆盖)
所以我们保证安全的同时, 通过复用这个链接,来处理多次请求, 我们只初始化一次(单进程), 每次 API 调用传入新的 SQL 流, 后端处理完 SQL 后回到就绪状态,等待下一个 SQL 流,多个”连接”实际上是同一个后端的不同会话:
conn1 = pglite_connect(db)
pglite_query(conn1, "SELECT 1") → 初始化后端(第一次)
↓
后端状态:已初始化,处理 SQL
↓
conn2 = pglite_connect(db)
pglite_query(conn2, "SELECT 2") → 复用后端,处理新 SQL
↓
后端状态:仍在运行,处理新 SQL
// pgl_mains.c
static int
pgl_run_single_user_session_internal(int single_argc, char *single_argv[],
const char *username, FILE *stream,
bool close_stream)
{
static bool single_session_initialized = false; // 关键:只初始化一次
FILE *feed = stream;
bool should_close = close_stream;
if (feed == NULL) {
feed = fopen(IDB_PIPE_SINGLE, "r");
should_close = true;
}
if (feed == NULL) {
ereport(ERROR, (errmsg("failed to open single-user SQL feed")));
return -1;
}
// Heavy-weight backend initialization. Run this only once per process;
// subsequent calls will just reuse the existing backend and process
// additional SQL from the provided stream.
if (!single_session_initialized) {
// 初始化 GUC、共享内存、进程结构等
process_postgres_switches(single_argc, single_argv, PGC_POSTMASTER, &dbname);
LocalProcessControlFile(false);
process_shared_preload_libraries();
IgnoreSystemIndexes = false;
process_shmem_requests();
InitializeShmemGUCs();
InitializeWalConsistencyChecking();
PgStartTime = GetCurrentTimestamp();
SetProcessingMode(InitProcessing);
InitPostgres(dbname, InvalidOid, username, InvalidOid,
(!am_walsender) ? INIT_PG_LOAD_SESSION_LIBS : 0,
NULL);
SetProcessingMode(NormalProcessing);
BeginReportingGUCOptions();
pgstat_report_connect(MyDatabaseId);
// 创建 MessageContext
MessageContext = AllocSetContextCreate(TopMemoryContext,
"MessageContext",
ALLOCSET_DEFAULT_SIZES);
// 设置异常处理
#define INITDB_SINGLE
#include "pgl_sjlj.c"
#undef INITDB_SINGLE
single_session_initialized = true;
}
if (!ignore_till_sync)
send_ready_for_query = true;
FILE *prev_feed = single_mode_feed;
single_mode_feed = feed;
pgl_process_sql_stream(feed); // 处理新的 SQL 流
single_mode_feed = prev_feed;
if (should_close && feed)
fclose(feed);
return 0;
}
single 模式是处理完直接退出,而我们想要这个进程一起启动处理请求, 第一次调用: pglite_query(conn, “SELECT 1”) ↓ tmpfile() → /tmp/tmpXXXXXX ↓ 写入 “SELECT 1\n” → rewind() ↓ postgres 从文件读取 → 处理 → EOF → 回到就绪 ↓ fclose(stream) → 文件被删除 第二次调用: pglite_query(conn, “SELECT 2”) ↓ tmpfile() → /tmp/tmpYYYYYY (新的文件) ↓ 写入 “SELECT 2\n” → rewind() ↓ postgres 从文件读取 → 处理 → EOF → 回到就绪 ↓ fclose(stream) → 文件被删除
static pglite_state
pglite_run_sql_with_hook(pglite_connection conn, const char *sql,
PgliteProtoHook *hook, char **out_error)
{
FILE *stream = tmpfile(); // 每次创建新的临时文件
if (!write_sql_to_stream(stream, sql)) {
fclose(stream);
return PGLITE_ERROR;
}
PG_TRY();
{
if (hook)
pglite_proto_install_hook(hook);
// 每次都是新的 SQL 流
int rc = pgl_run_sql_stream(stream, conn->dbname, conn->username);
state = (rc == 0) ? PGLITE_SUCCESS : PGLITE_ERROR;
if (hook)
pglite_proto_clear_hook(hook);
}
PG_CATCH();
{
// 错误处理
ErrorData *edata = CopyErrorData();
if (hook)
pglite_proto_clear_hook(hook);
if (out_error && edata && edata->message)
*out_error = MemoryContextStrdup(pglite_api_context(), edata->message);
FlushErrorState();
FreeErrorData(edata);
state = PGLITE_ERROR;
}
PG_END_TRY();
fclose(stream); // 关闭临时文件
return state;
}
目前的整体处理流程非常原始, 没有做很多优化, 只支持 C API, 不过可以用在一些测试流程使用时没有问题的
1. 应用启动
pglite_open("/data", &db)
↓
2. 初始化 PostgreSQL 后端
pgl_bootstrap_engine("/data")
├─ pgl_native_startup()
│ └─ 设置环境变量(PREFIX, PGDATA等)
└─ pgl_native_initdb_if_needed()
├─ 检查数据库是否存在
├─ 如果不存在:
│ └─ pgl_initdb()
│ ├─ setjmp(initdb_exit_jmp)
│ ├─ pgl_initdb_main()
│ │ └─ 劫持 popen() → 写入到 IDB_PIPE_BOOT
│ ├─ longjmp(initdb_exit_jmp) ← pg_proc_exit
│ ├─ setjmp(initdb_exit_jmp)
│ ├─ freopen(IDB_PIPE_BOOT, "r", stdin)
│ ├─ BootstrapModeMain()
│ └─ longjmp(initdb_exit_jmp) ← 完成
└─ 标记:已初始化(single_session_initialized)
↓
3. 创建连接
pglite_connect(db, &conn)
└─ 创建连接对象(内存结构)
↓
4. 执行查询(每次 API 调用)
pglite_query(conn, "SELECT * FROM pg_class")
├─ tmpfile() → 创建临时文件
├─ write_sql_to_stream()
│ └─ 写入 SQL + rewind()
├─ pgl_run_sql_stream(stream, dbname, username)
│ └─ pgl_run_single_user_session_internal()
│ ├─ single_session_initialized?
│ │ └─ 否:初始化后端(只执行一次)
│ │ ├─ AsyncPostgresSingleUserMain()
│ │ ├─ IsPostmasterEnvironment = true
│ │ └─ io_init() → 伪造 MyProcPort
│ ├─ sigsetjmp(local_sigjmp_buf)
│ ├─ pgl_process_sql_stream()
│ │ └─ 读取临时文件中的 SQL
│ │ └─ #include "pg_proto.c"
│ │ ├─ case 'Q': exec_simple_query()
│ │ ├─ case 'P': exec_parse_message()
│ │ └─ case 'B': exec_bind_message()
│ │ ↓
│ │ PostgreSQL 执行查询
│ │ ↓
│ │ pglite_proto_hook.on_row_description
│ │ pglite_proto_hook.on_data_row
│ │ ↓
│ │ 结果收集到内存(pglite_result)
│ │ └─ ereport(ERROR) ?
│ │ ├─ 是 → siglongjmp(local_sigjmp_buf)
│ │ └─ 否 → goto wire_flush
│ └─ 如果有 longjmp:
│ └─ clear_error() → 清理状态
├─ fclose(stream) → 删除临时文件
└─ 返回 pglite_result
↓
5. 关闭数据库
pglite_close(&db)
└─ 释放内存结构