LevelDB, 一份 C++ 学习指南

LevelDB 是 Google 开源的高性能键值存储库,其代码质量极高,是学习现代 C++ 工程实践的绝佳教材。本文将从对象设计、接口设计、内存管理等多个角度深入分析 LevelDB 中的 C++ 技巧。

1 基本对象设计

1.1 struct 与 class 的正确使用

LevelDB 中对 structclass 的使用遵循清晰的原则: struct 用于纯数据聚合 (POD-like)

struct LEVELDB_EXPORT Options {
  // 默认构造所有字段,纯数据结构不用写太复杂的构造函数
  Options();
  ...
};

OptionsReadOptionsWriteOptions 都使用 struct,因为它们本质上是配置参数的集合,所有成员默认 public,允许自由访问和复制。

struct ManualCompaction {
    int level;
    bool done;
    const InternalKey* begin;  // null = 范围起点
    const InternalKey* end;    // null = 范围终点
    InternalKey tmp_storage;   // 压缩进度记录
  };

内部数据结构如 ManualCompactionCompactionStats 也使用 struct。

class 用于有行为/不变式的类型

  DB() = default;

  DB(const DB&) = delete;
  DB& operator=(const DB&) = delete;

  virtual ~DB();

DBCacheIterator 等需要封装、有资源管理需求的类型使用 class。

1.2 禁止复制:为什么以及如何

LevelDB 中大量类禁止复制,这是 C++ 资源管理的核心模式。

标准写法 (C++11)

  Table(const Table&) = delete;
  Table& operator=(const Table&) = delete;

为什么要禁止复制?

  1. 资源所有权唯一性:如 Arena 管理内存块,PosixSequentialFile 持有文件描述符
  2. 避免浅拷贝问题:防止多个对象指向同一资源导致 double-free
  3. 语义不适合复制:如 Env 是全局环境、DB 是数据库句柄
  Arena(const Arena&) = delete;
  Arena& operator=(const Arena&) = delete;
  MutexLock(const MutexLock&) = delete;
  MutexLock& operator=(const MutexLock&) = delete;

但有些类型应该允许复制

  // Slice 只是个指针+长度的 wrapper,拷贝一下无妨
  Slice(const Slice&) = default;
  Slice& operator=(const Slice&) = default;

Slice 只是指针+长度的轻量级视图,复制开销小且语义正确。

  // WriteBatch 也是值语义,随便拷
  WriteBatch(const WriteBatch&) = default;
  WriteBatch& operator=(const WriteBatch&) = default;

1.3 析构函数与资源管理

RAII 原则:资源在构造时获取,在析构时释放。

Arena::~Arena() {
  for (size_t i = 0; i < blocks_.size(); i++) {
    delete[] blocks_[i];
  }
}

Arena 析构时释放所有分配的内存块。

  PosixSequentialFile(std::string filename, int fd)
      : fd_(fd), filename_(std::move(filename)) {}
  ~PosixSequentialFile() override { close(fd_); }

文件类在析构时关闭文件描述符。

Status 的自定义复制/移动语义

  // 默认构造为 OK 状态,无内存分配,高效
  Status() noexcept : state_(nullptr) {}
  ~Status() { delete[] state_; }

  Status(const Status& rhs);
  Status& operator=(const Status& rhs);

  Status(Status&& rhs) noexcept : state_(rhs.state_) { rhs.state_ = nullptr; }
inline Status::Status(const Status& rhs) {
  state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_);
}
inline Status& Status::operator=(const Status& rhs) {
  // 处理了自赋值 (this == &rhs) 
  // 也处理了 rhs 和 *this 都有值的情况 -> 先释放旧的,再拷新的
}
inline Status& Status::operator=(Status&& rhs) noexcept {
  std::swap(state_, rhs.state_);
  return *this;
}

注意 operator= 中检查 state_ != rhs.state_ 同时处理了自赋值和两者都是 OK 的情况。

2 接口设计

2.1 抽象接口 + 工厂模式

LevelDB 广泛使用抽象基类定义接口:

class LEVELDB_EXPORT Comparator {
 public:
  virtual ~Comparator();

  // 三路比较: <0, ==0, >0
  // 标准的 C 风格比较返回值
  virtual int Compare(const Slice& a, const Slice& b) const = 0;

通过工厂函数返回具体实现:

// 内置的字节序比较器
// 注意:返回的单例由模块管理,千万别手欠 delete 它
LEVELDB_EXPORT const Comparator* BytewiseComparator();
// 创建固定大小的 LRU Cache
// 实现了标准的最近最少使用算法
LEVELDB_EXPORT Cache* NewLRUCache(size_t capacity);

2.2 单例模式

使用 NoDestructor 实现安全的单例

template <typename InstanceType>
class NoDestructor {
 public:
  template <typename... ConstructorArgTypes>
  explicit NoDestructor(ConstructorArgTypes&&... constructor_args) {
    static_assert(sizeof(instance_storage_) >= sizeof(InstanceType),
                  "instance_storage_ is not large enough to hold the instance");
    static_assert(std::is_standard_layout_v<NoDestructor<InstanceType>>);
    static_assert(
        offsetof(NoDestructor, instance_storage_) % alignof(InstanceType) == 0,
        "instance_storage_ does not meet the instance's alignment requirement");
    static_assert(
        alignof(NoDestructor<InstanceType>) % alignof(InstanceType) == 0,
        "instance_storage_ does not meet the instance's alignment requirement");
    new (instance_storage_)
        InstanceType(std::forward<ConstructorArgTypes>(constructor_args)...);
  }

  ~NoDestructor() = default;

  NoDestructor(const NoDestructor&) = delete;
  NoDestructor& operator=(const NoDestructor&) = delete;

  InstanceType* get() {
    return reinterpret_cast<InstanceType*>(&instance_storage_);
  }

 private:
  alignas(InstanceType) char instance_storage_[sizeof(InstanceType)];
};

使用示例:

const Comparator* BytewiseComparator() {
  static NoDestructor<BytewiseComparatorImpl> singleton;
  return singleton.get();
}

更复杂的单例:SingletonEnv

template <typename EnvType>
class SingletonEnv {
 public:
  SingletonEnv() {
#if !defined(NDEBUG)
    env_initialized_.store(true, std::memory_order_relaxed);
#endif  // !defined(NDEBUG)
    static_assert(sizeof(env_storage_) >= sizeof(EnvType),
                  "env_storage_ will not fit the Env");
    static_assert(std::is_standard_layout_v<SingletonEnv<EnvType>>);
    static_assert(
        offsetof(SingletonEnv<EnvType>, env_storage_) % alignof(EnvType) == 0,
        "env_storage_ does not meet the Env's alignment needs");
    static_assert(alignof(SingletonEnv<EnvType>) % alignof(EnvType) == 0,
                  "env_storage_ does not meet the Env's alignment needs");
    new (env_storage_) EnvType();
  }
  ~SingletonEnv() = default;

  SingletonEnv(const SingletonEnv&) = delete;
  SingletonEnv& operator=(const SingletonEnv&) = delete;

  Env* env() { return reinterpret_cast<Env*>(&env_storage_); }
Env* Env::Default() {
  static PosixDefaultEnv env_container;
  return env_container.env();
}

2.3 PIMPL 模式 (Pointer to Implementation)

Table 类使用前向声明的 Rep 结构隐藏实现细节:

 private:
  friend class TableCache;
  struct Rep;

  static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);

  explicit Table(Rep* rep) : rep_(rep) {}

  // 查找成功后回调 handle_result
  // 优化:BloomFilter 说不在,那就肯定不在,回调省了
  Status InternalGet(const ReadOptions&, const Slice& key, void* arg,
                     void (*handle_result)(void* arg, const Slice& k,
                                           const Slice& v));

  void ReadMeta(const Footer& footer);
  void ReadFilter(const Slice& filter_handle_value);

  Rep* const rep_;

2.4 RAII 锁管理

class SCOPED_LOCKABLE MutexLock {
 public:
  explicit MutexLock(port::Mutex* mu) EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) {
    this->mu_->Lock();
  }
  ~MutexLock() UNLOCK_FUNCTION() { this->mu_->Unlock(); }

  MutexLock(const MutexLock&) = delete;
  MutexLock& operator=(const MutexLock&) = delete;

 private:
  port::Mutex* const mu_;
};

使用时自动加锁解锁:

void LRUCache::Release(Cache::Handle* handle) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast<LRUHandle*>(handle));
}

3 内存管理

3.1 Arena 内存池

Arena 是 LevelDB 中最重要的内存管理组件,实现了高效的批量分配:

class Arena {
 public:
  Arena();

  Arena(const Arena&) = delete;
  Arena& operator=(const Arena&) = delete;

  ~Arena();

  // 还是得提供个非对齐的分配接口,毕竟不是所有场景都要求对齐,能省点碎片是点
  char* Allocate(size_t bytes);

  // 提供符合 malloc 那个级别的对齐保证 (通常是 8 或 16 字节)
  char* AllocateAligned(size_t bytes);

  // 统计一下一共申请了多少内存,监控用
  size_t MemoryUsage() const {
    return memory_usage_.load(std::memory_order_relaxed);
  }

 private:
  char* AllocateFallback(size_t bytes);
  char* AllocateNewBlock(size_t block_bytes);

  // 分配状态记录
  char* alloc_ptr_;
  size_t alloc_bytes_remaining_;

  // 这是一个数组,记录了我用 new[] 分配的所有内存块。
  std::vector<char*> blocks_;

  // 竞技场总内存占用。
  //
  // 原子变量统计内存使用量
  // TODO(costan): 这里用原子,但 alloc_ptr_ 没加锁,多线程并发分配真的安全吗?(Arena 通常是单线程用的)
  std::atomic<size_t> memory_usage_;
};

快速路径分配 (inline 优化):

inline char* Arena::Allocate(size_t bytes) {
  // 0 字节分配语义不明,直接禁止。
  // 内部代码保证了不会传 0 进来。
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}

智能的分配策略

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    // 超过 1/4 块大小的对象单独分配
    // 避免在标准块里留下无法利用的大碎片
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // 当前块剩下的那点空间不够了,直接废弃,开新块
  // 典型的空间换时间策略
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

大于 1/4 块大小的请求单独分配,避免浪费。

3.2 Placement New

在 Arena 上构造对象:

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
    const Key& key, int height) {
  char* const node_memory = arena_->AllocateAligned(
      sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
  return new (node_memory) Node(key);
}

3.3 引用计数

LRU Cache 中的引用计数管理:

void LRUCache::Ref(LRUHandle* e) {
  if (e->refs == 1 && e->in_cache) {  // 之前只是在 LRU 热度不够,现在又有人用了,复活!移回 in_use_
    LRU_Remove(e);
    LRU_Append(&in_use_, e);
  }
  e->refs++;
}

void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) {  // 即使在 Cache 里也被踢了,引用归零 -> 真正释放内存
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e);
  } else if (e->in_cache && e->refs == 1) {
    // 还有 Cache 引用(refs==1),但外部不用了 -> 放入 LRU 列表等待淘汰
    LRU_Remove(e);
    LRU_Append(&lru_, e);
  }
}

3.4 资源限制器 (Limiter)

class Limiter {
 public:
  // 信号量机制:限制最大并发资源数
  Limiter(int max_acquires)
      :
#if !defined(NDEBUG)
        max_acquires_(max_acquires),
#endif  // !defined(NDEBUG)
        acquires_allowed_(max_acquires) {
    assert(max_acquires >= 0);
  }

  Limiter(const Limiter&) = delete;
  Limiter operator=(const Limiter&) = delete;

  // 尝试获取资源
  // true: 获取成功
  // false: 资源耗尽
  bool Acquire() {
    int old_acquires_allowed =
        acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);

    if (old_acquires_allowed > 0) return true;

    int pre_increment_acquires_allowed =
        acquires_allowed_.fetch_add(1, std::memory_order_relaxed);

    // NDEBUG 消除 unused variable 警告
    (void)pre_increment_acquires_allowed;
    // 逻辑检查:Release 调用次数一定不能超过 Acquire 成功次数
    assert(pre_increment_acquires_allowed < max_acquires_);

    return false;
  }

  // 归还资源
  void Release() {
    int old_acquires_allowed =
        acquires_allowed_.fetch_add(1, std::memory_order_relaxed);

    (void)old_acquires_allowed; // 消除警告
    // 同样的逻辑检查
    assert(old_acquires_allowed < max_acquires_);
  }

4 其他 C++ 技巧

4.1 Slice: 零拷贝的字符串视图

class LEVELDB_EXPORT Slice {
 public:
  // 构造函数:各种方式初始化,均可以认为是轻量的指针赋值

  // 默认拷贝构造和赋值,高效
  Slice(const Slice&) = default;
  Slice& operator=(const Slice&) = default;

  // 类似 std::string_view::data()
  const char* data() const { return data_; }

  // 类似 std::string_view::size()
  size_t size() const { return size_; }


  bool empty() const { return size_ == 0; }

  const char* begin() const { return data(); }
  const char* end() const { return data() + size(); }

  // 像数组一样访问,Debug 模式下会检查越界
  char operator[](size_t n) const {
    assert(n < size());
    return data_[n];
  }

  // 重置为空,即使 data_ 之前指向有效内存也不用释放(不拥有所有权)
  void clear() {
    data_ = "";
    size_ = 0;
  }

  // 调整指针和长度,实现非常高效的前缀移除
  void remove_prefix(size_t n) {
    assert(n <= size());
    data_ += n;
    size_ -= n;
  }

  // 显式转为 string,这里会发生深拷贝
  std::string ToString() const { return std::string(data_, size_); }

  // 字典序比较
  int compare(const Slice& b) const;

  // 前缀检查,很常用 (Check Prefix)
  bool starts_with(const Slice& x) const {
    return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
  }

 private:
  const char* data_;
  size_t size_;
};

核心思想:不拥有数据,只是视图。类似 C++17 的 std::string_view

4.2 Status: 富错误信息

 private:
  enum Code {
    kOk = 0,
    kNotFound = 1,
    kCorruption = 2,
    kNotSupported = 3,
    kInvalidArgument = 4,
    kIOError = 5
  };

  Code code() const {
    return (state_ == nullptr) ? kOk : static_cast<Code>(state_[4]);
  }

  Status(Code code, const Slice& msg, const Slice& msg2);
  static const char* CopyState(const char* s);

  // 内存布局优化:
  // OK -> nullptr
  // Error -> 数组 [长度, Code, Message...],紧凑存储
  const char* state_;

优化:OK 状态不分配内存 (state_ == nullptr)。

4.3 跨平台编码 (Endian-neutral)

inline void EncodeFixed32(char* dst, uint32_t value) {
  uint8_t* const buffer = reinterpret_cast<uint8_t*>(dst);

  // 现代编译器能识别这种模式,直接优化成 mov/str 指令(处理字节序)
  buffer[0] = static_cast<uint8_t>(value);
  buffer[1] = static_cast<uint8_t>(value >> 8);
  buffer[2] = static_cast<uint8_t>(value >> 16);
  buffer[3] = static_cast<uint8_t>(value >> 24);
}

小端序编码,现代编译器会优化为单条指令。

4.4 线程安全标注

  // 互斥锁保护以下成员变量
  port::Mutex mutex_;
  std::atomic<bool> shutting_down_;
  port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
  MemTable* mem_;
  MemTable* imm_ GUARDED_BY(mutex_);  // 正在 compact 的 immutable memtable
  std::atomic<bool> has_imm_;         // 允许后台线程在无锁情况下检查是否有 imm_

使用 GUARDED_BYEXCLUSIVE_LOCKS_REQUIRED 等标注帮助静态分析工具检测竞态条件。

4.5 变长数组技巧

struct LRUHandle {
  void* value;
  void (*deleter)(const Slice&, void* value);
  LRUHandle* next_hash;
  LRUHandle* next;
  LRUHandle* prev;
  size_t charge;  // TODO(opt): 32位可能就够了?(节省内存)
  size_t key_length;
  bool in_cache;     // 标记是否在 cache 中 (用于引用计数处理)
  uint32_t refs;     // 引用计数
  uint32_t hash;     // 预存 Hash 值,避免重复计算
  char key_data[1];  // 柔性数组 (Flexible Array Member) 开始位置

  Slice key() const {
    // Dummy Head 技巧:空链表时 next 指向自己
    // Head 节点不存 Key
    assert(next != this);

    return Slice(key_data, key_length);
  }
};

通过 char key_data[1] 实现柔性数组成员:

  LRUHandle* e =
      reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));

4.6 原子操作与内存序

  // 封装原子操作,统一添加 Memory Barrier
  Node* Next(int n) {
    assert(n >= 0);
    // Acquire Load: 确保读取时看到的内容是初始化好的
    return next_[n].load(std::memory_order_acquire);
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Release Store: 确保我的初始化操作对其他线程可见
    next_[n].store(x, std::memory_order_release);
  }

  // 无屏障版本,只有在少数几个安全的地方才能偷偷用一下。
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return next_[n].load(std::memory_order_relaxed);
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].store(x, std::memory_order_relaxed);
  }

SkipList 展示了精细的内存序控制,在性能和正确性之间取得平衡。

总结

LevelDB 的代码展示了以下 C++ 最佳实践:

技术目的
= delete 禁止复制资源所有权管理
RAII自动资源管理
抽象接口 + 工厂解耦与可扩展性
Arena 内存池高效内存分配
Slice 视图零拷贝字符串传递
Status无异常错误处理
NoDestructor安全单例
线程安全标注静态分析支持
[NeoKV] 基于Braft分布式KV,支持Redis协议
rocksdb 生存手册