DuckDB-源码分析(2)Parquet读设计

在ParquetReader的构造函数中,会打开文件句柄,加载元数据,并初始化。Scan函数负责实际的扫描过程,处理行组,应用过滤器,读取数据到DataChunk中。首先,在元数据加载部分,parquet文件的元数据通常位于文件末尾,包含schema、行组信息、统计信息等。LoadMetadata函数通过读取文件末尾的元数据部分,解析出FileMetaData结构。

DeriveLogicalType函数根据Parquet的SchemaElement中的类型信息(如Type、converted_type、logicalType)转换为DuckDB的LogicalType。例如,Parquet的INT32可能对应DuckDB的INTEGER,而带有converted_type为DATE的INT32会被转换为DATE类型。这里还处理了时间戳、UUID等复杂类型的转换。

创建列读取器的过程在CreateReaderRecursive中完成,这个函数递归地遍历SchemaElement树,根据每个元素的类型和子元素创建相应的ColumnReader。例如,遇到STRUCT类型时,会创建StructColumnReader,并为每个子列创建对应的读取器,对于重复的字段(如LIST或MAP),会使用ListColumnReader来处理嵌套结构。

初始化时,InitializeSchema调用CreateReader创建根读取器,通常是StructColumnReader,因为它对应Parquet文件的根结构。然后根据读取到的列信息填充columns向量,记录每个列的名称和类型。

扫描数据的过程由Scan函数处理。ParquetReaderScanState用于跟踪扫描的状态,包括当前处理的行组、文件句柄、过滤器等。PrepareRowGroupBuffer函数准备当前行组的数据,可能应用统计信息进行过滤,跳过不需要读取的行组。然后通过ColumnReader的Read方法将数据读取到DataChunk中,应用过滤器(如果有的话),最终将结果返回。

过滤器处理部分,ApplyFilter函数根据过滤条件对读取的数据进行过滤。例如,等值比较、范围比较、IS NULL等条件会被应用到数据上,减少需要处理的数据量,提升查询性能。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ParquetReader::ParquetReader(ClientContext &context, string file_name, 
ParquetOptions parquet_options,
shared_ptr<ParquetFileMetadataCache> metadata)
: fs(FileSystem::GetFileSystem(context)),
allocator(BufferAllocator::Get(context)),
parquet_options(std::move(parquet_options)) {

// 1. 打开文件
file_handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ);

// 2. 加载/获取元数据缓存
if (!metadata) {
this->metadata = LoadMetadata(context, allocator, *file_handle,
parquet_options.encryption_config,
*encryption_util);
} else {
this->metadata = std::move(metadata);
}

// 3. 初始化Schema
InitializeSchema(context);
}

关键步骤:

  1. 文件打开:使用DuckDB统一文件系统接口
  2. 元数据加载:
    • 优先使用传入的缓存元数据;
    • 无缓存时调用LoadMetadata解析
  3. Schema初始化:构建列读取器的树

MetaData加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
shared_ptr<ParquetFileMetadataCache> LoadMetadata(...) {
// 创建Thrift协议解析器
auto file_proto = CreateThriftFileProtocol(allocator, file_handle, false);

// 读取文件尾部8字节
transport.read(buf.ptr, 8);
if (memcmp(buf.ptr+4, "PAR1",4)==0) {
footer_encrypted = false; // 标准Parquet文件
} else if (memcmp(..., "PARE", ...)) {
footer_encrypted = true; // 加密文件
ParquetCrypto::Read(...); // AES-GCM解密
}

// 反序列化元数据
auto metadata = make_uniq<FileMetaData>();
metadata->read(file_proto.get());

// 处理GeoParquet扩展
auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context);

return make_shared_ptr<ParquetFileMetadataCache>(...);
}

元数据结构:

1
2
3
4
5
struct ParquetFileMetadataCache {
unique_ptr<FileMetaData> metadata; // Thrift生成的元数据
time_t read_time; // 缓存时间戳
unique_ptr<GeoParquetFileMetadata> geo_metadata; // 地理扩展
};

Schema 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ParquetReader::InitializeSchema(ClientContext &context) {
auto file_meta_data = GetFileMetadata();

// 1. 创建根读取器
root_reader = CreateReader(context);

// 2. 构建列定义
auto &struct_reader = root_reader->Cast<StructColumnReader>();
for (idx_t i = 0; i < child_types.size(); i++) {
columns.emplace_back(
MultiFileReaderColumnDefinition(
child_types[i].first, // 列名
child_types[i].second // 逻辑类型
)
);
}

// 3. 处理生成列(如file_row_number)
if (parquet_options.file_row_number) {
columns.emplace_back("file_row_number", LogicalType::BIGINT);
}
}

结构说明

graph TD
    A[ParquetReader] --> B[FileHandle]
    A --> C[ParquetFileMetadataCache]
    C --> D[FileMetaData]
    C --> E[GeoParquetMetadata]
    A --> F[ColumnReader树]
    
    subgraph 元数据
        D --> G[SchemaElements]
        D --> H[RowGroups]
        H --> I[ColumnChunks]
        I --> J[Statistics]
        I --> K[EncryptionInfo]
    end
    
    subgraph 读取器结构
        F --> L[StructColumnReader]
        L --> M[ColumnReader...]
        M --> N[ListColumnReader]
        M --> O[TemplatedColumnReader]
    end
    
    B -->|读取| P[Parquet文件]
    style A fill:#f9f,stroke:#333
    style C fill:#bbf,stroke:#333
    style F fill:#9f9,stroke:#333

关键数据结构说明

  1. FileMetaData 元数据类结构
1
2
3
4
5
6
7
8
9
struct FileMetaData {
1: required i32 version
2: required list<SchemaElement> schema
3: required i64 num_rows
4: required list<RowGroup> row_groups
5: optional list<KeyValue> key_value_metadata
6: optional string created_by
7: optional list<ColumnOrder> column_orders
}
  1. ColumnReader 类层次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ColumnReader {
public:
virtual void Read(...) = 0;
virtual void Skip(...) = 0;
virtual unique_ptr<BaseStatistics> Stats(...) = 0;
};

class StructColumnReader : public ColumnReader {
vector<unique_ptr<ColumnReader>> child_readers;
};

class ListColumnReader : public ColumnReader {
unique_ptr<ColumnReader> child_reader;
};

template <class T>
class TemplatedColumnReader : public ColumnReader {
// 物理类型特化实现
};

初始化阶段性能优化

  1. 元数据缓存
1
2
3
4
5
metadata = ObjectCache::Get(file_name);
if (!metadata || expired) {
metadata = LoadMetadata(...);
ObjectCache::Put(file_name, metadata);
}
  1. 延迟加载
1
2
3
4
5
6
void StructColumnReader::InitializeRead(...) {
// 按需初始化子读取器
for (auto &child : child_readers) {
if (child) child->InitializeRead(...);
}
}
  1. 内存预分配
1
2
ResizeableBuffer define_buf;
define_buf.resize(allocator, STANDARD_VECTOR_SIZE); // 2048

初始化时序图

sequenceDiagram
    participant Client
    participant ParquetReader
    participant FileSystem
    participant ThriftParser
    participant ColumnReader
    
    Client->>ParquetReader: 创建Reader(file_name)
    ParquetReader->>FileSystem: OpenFile()
    FileSystem-->>ParquetReader: 返回FileHandle
    
    alt 无缓存
        ParquetReader->>ThriftParser: LoadMetadata()
        ThriftParser->>FileHandle: 读取文件尾部
        ThriftParser-->>ParquetReader: FileMetaData
    else 有缓存
        ParquetReader->>ObjectCache: GetMetadata()
    end
    
    ParquetReader->>ColumnReader: CreateReaderRecursive()
    loop 递归创建
        ColumnReader->>ColumnReader: 创建子读取器
    end
    ParquetReader-->>Client: 初始化完成

扫描数据过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ParquetReader::InitializeScan(ClientContext &context, 
ParquetReaderScanState &state,
vector<idx_t> groups_to_read) {
// 重置扫描状态
state.current_group = -1;
state.group_offset = 0;
state.finished = false;

// 独立文件句柄(支持多线程)
state.file_handle = fs.OpenFile(file_name,
FileFlags::FILE_FLAGS_READ | (prefetch ? FILE_FLAGS_DIRECT_IO : 0));

// 创建Thrift协议解析器(每个扫描状态独立)
state.thrift_file_proto = CreateThriftFileProtocol(allocator, *state.file_handle, state.prefetch_mode);

// 创建列读取器树(可能带谓词过滤)
state.root_reader = CreateReader(context);

// 初始化缓冲区
state.define_buf.resize(allocator, STANDARD_VECTOR_SIZE);
state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE);

// 配置预取模式
Value prefetch_all;
context.TryGetCurrentSetting("prefetch_all_parquet_files", prefetch_all);
state.prefetch_mode = prefetch_all.GetValue<bool>();
}

关键点:

  • 每个扫描状态维护独立的文件句柄和Thrift解析器,支持并发扫描
  • 根据配置选择直接IO模式(减少内核缓存开销)
  • 列读取器树可能根据谓词进行剪枝(减少不必要的列读取)

RowGroup处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t out_col_idx) {
auto &group = GetGroup(state);
auto column_id = reader_data.column_ids[out_col_idx];
auto &column_reader = state.root_reader->Cast<StructColumnReader>().GetChildReader(column_id);

// 1. 检查行组统计信息
auto stats = column_reader.Stats(state.current_group, group.columns);
if (stats && reader_data.filters) {
auto global_id = reader_data.column_mapping[out_col_idx];
auto filter = reader_data.filters->filters[global_id];

// 2. 应用统计过滤
if (stats->CheckFilter(*filter) == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
state.group_offset = group.num_rows; // 跳过整个行组
return;
}
}

// 3. 初始化该RowGroup列读取器
column_reader.InitializeRead(state.current_group, group.columns, *state.thrift_file_proto);
}

过滤逻辑:

  • 使用行组的min/max值快速判断是否跳过;
  • 对字符串列使用更精确的字典过滤;

ScanInternal 函数

ScanInternal函数负责从Parquet文件中读取数据到DataChunk中,处理行组切换、预取、数据解码和过滤等步骤。函数开始检查是否完成扫描,然后处理行组切换。在切换行组时,会进行预取操作,包括整个行组的预取或按列预取。之后,函数计算需要读取的行数,初始化过滤掩码,然后按列读取数据,应用过滤器,最后更新状态。在预取部分,代码根据是否启用预取模式(prefetch_mode)以及扫描的列比例(scan_percentage)来决定是预取整个行组还是按列预取。

  1. 行组切换与预取处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
if (state.current_group < 0 || (int64_t)state.group_offset >= GetGroup(state).num_rows) {
// 切换到下一个行组
state.current_group++;
state.group_offset = 0;

auto &trans = reinterpret_cast<ThriftFileTransport &>(*state.thrift_file_proto->getTransport());
trans.ClearPrefetch(); // 清空之前的预取

// 检查行组边界
if ((idx_t)state.current_group == state.group_idx_list.size()) {
state.finished = true;
return false;
}

// 计算需要扫描的压缩字节数
uint64_t to_scan_compressed_bytes = 0;
for (所有列) {
PrepareRowGroupBuffer(...); // 准备列读取器
to_scan_compressed_bytes += 列压缩大小;
}

// 预取决策逻辑
if (state.prefetch_mode) {
double scan_percentage = 扫描字节数 / 行组总跨度;

if (scan_percentage > 0.95) {
// 全行组预取
trans.Prefetch(起始偏移, 行组总跨度);
} else {
// 按列预取
for (所有列) {
if (无过滤 || 列有过滤条件) {
注册列预取;
}
}
trans.PrefetchRegistered(); // 触发预取
}
}
}

预取策略:

条件 策略 适用场景
scan_percentage > 95% 预取整个行组 全表扫描
存在过滤器 仅预取过滤列 条件查询
无过滤器 预取所有列 投影查询
  1. 读取逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 计算本次读取行数
idx_t this_output_chunk_rows = Min(STANDARD_VECTOR_SIZE,
GetGroup().num_rows - state.group_offset);
result.SetCardinality(this_output_chunk_rows);

// 初始化过滤掩码
parquet_filter_t filter_mask;
filter_mask.set(); // 初始全有效
for (i >= this_output_chunk_rows)
filter_mask.reset(i); // 屏蔽超出部分

// 列处理流程
if (存在过滤器) {
// 第一阶段:处理过滤列
for (每个过滤条件) {
if (过滤列是常量) {
ApplyFilter(常量向量, 过滤条件);
} else {
读取过滤列数据到result_vector;
ApplyFilter(result_vector, 过滤条件);
}
}

// 第二阶段:处理其他列
for (剩余列) {
if (过滤结果全无效) {
Skip(); // 跳过解码
} else {
读取列数据到result_vector;
}
}

// 生成选择向量
SelectionVector sel;
for (i=0 到 this_output_chunk_rows) {
if (filter_mask.test(i))
sel.append(i);
}
result.Slice(sel); // 压缩结果
} else {
// 无过滤直接读取
for (所有列) {
读取列数据到result_vector;
}
}

// 更新偏移量
state.group_offset += this_output_chunk_rows;

ThriftFileTransport 数据结构

1
2
3
4
5
6
7
8
9
struct ReadHead {
ReadHead(idx_t location, uint64_t size) : location(location), size(size) {};
idx_t location; // 数据块起始位置
uint64_t size; // 数据块大小
AllocatedData data; // 分配的存储空间
bool data_isset = false; // 数据是否已加载标记
idx_t GetEnd() const { return size + location; } // 获取数据块结束位置
void Allocate(Allocator &allocator) { data = allocator.Allocate(size); } // 分配内存空间
};

ReadHead表示一个数据块的基本信息,包括起始位置、大小、存储空间等信息。
初始化时传入起始位置和大小,并且提供获取数据块结束位置的方法,以及提供内存分配方法,将数据加载到data缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct ReadHeadComparator {
static constexpr uint64_t ALLOW_GAP = 1 << 14; // 16 KiB
bool operator()(const ReadHead *a, const ReadHead *b) const {
auto a_start = a->location;
auto a_end = a->location + a->size;
auto b_start = b->location;

if (a_end <= NumericLimits<idx_t>::Maximum() - ALLOW_GAP) {
a_end += ALLOW_GAP;
}

return a_start < b_start && a_end < b_start;
}
};

ReadHeadComparator用于比较两个ReadHead对象的位置关系,判断是否允许合并或重叠。具体的,类中定义了一个允许的隔距ALLOW_GAP(16KiB),用来比较两个数据块的起始位置和结束位置:如果a的结束位置(加上允许隔距)在b的起始位置之前,则认为两者不重叠。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
struct ReadAheadBuffer {
ReadAheadBuffer(Allocator &allocator, FileHandle &handle) : allocator(allocator), handle(handle) {}

std::list<ReadHead> read_heads; // 存储所有预读数据块
std::set<ReadHead *, ReadHeadComparator> merge_set; // 用于合并数据块

Allocator &allocator; // 内存分配器
FileHandle &handle; // 文件句柄

idx_t total_size = 0; // 总预读大小

// 添加一个预读数据块
void AddReadHead(idx_t pos, uint64_t len, bool merge_buffers = true) {
// 尝试合并现有数据块
if (merge_buffers) {
ReadHead new_read_head {pos, len};
auto lookup_set = merge_set.find(&new_read_head);
if (lookup_set != merge_set.end()) {
auto existing_head = *lookup_set;
auto new_start = MinValue(existing_head->location, new_read_head.location);
auto new_length = MaxValue(existing_head->GetEnd(), new_read_head.GetEnd()) - new_start;
existing_head->location = new_start;
existing_head->size = new_length;
return;
}
}

read_heads.emplace_front(ReadHead(pos, len));
total_size += len;
auto &read_head = read_heads.front();

if (merge_buffers) {
merge_set.insert(&read_head);
}

// 检查数据块是否超出文件范围
if (read_head.GetEnd() > handle.GetFileSize()) {
throw std::runtime_error("Prefetch registered for bytes outside file");
}
}

// 获取相关数据块
ReadHead *GetReadHead(idx_t pos) {
for (auto &read_head : read_heads) {
if (pos >= read_head.location && pos < read_head.GetEnd()) {
return &read_head;
}
}
return nullptr;
}

// 预读所有数据块
void Prefetch() {
for (auto &read_head : read_heads) {
read_head.Allocate(allocator);

if (read_head.GetEnd() > handle.GetFileSize()) {
throw std::runtime_error("Prefetch requested for bytes outside file");
}

handle.Read(read_head.data.get(), read_head.size, read_head.location);
read_head.data_isset = true;
}
}
};

ReadAheadBuffer 结构用来管理预读数据块的队列和合并操作,list存储所有未读取的数据块,set配合ReadHeadComparator合并相邻或重叠的数据块,减少存储和合并IO,具体的:

  • AddReadHead添加预读数据块,并尝试与现有数据块合并;
  • GetReadHead根据给定位置返回对应的数据块;
  • Prefetch将所有预读数据块从文件中读取并加载到内存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ThriftFileTransport {
// void Prefetch(idx_t pos, uint64_t len) { RegisterPrefetch(pos, len, false); FinalizeRegistration(); PrefetchRegistered(); }
// void RegisterPrefetch(idx_t pos, uint64_t len, bool can_merge = true) { ra_buffer.AddReadHead(pos, len, can_merge); }
// void FinalizeRegistration() { ra_buffer.merge_set.clear(); }
// void PrefetchRegistered() { ra_buffer.Prefetch(); }
// void ClearPrefetch() { ra_buffer.read_heads.clear(); ra_buffer.merge_set.clear(); }
uint32_t read(uint8_t *buf, uint32_t len) {
auto prefetch_buffer = ra_buffer.GetReadHead(location);
if (prefetch_buffer != nullptr && location - prefetch_buffer->location + len <= prefetch_buffer->size) {
// 如果有预读数据,直接使用
if (!prefetch_buffer->data_isset) {
prefetch_buffer->Allocate(allocator);
handle.Read(prefetch_buffer->data.get(), prefetch_buffer->size, prefetch_buffer->location);
prefetch_buffer->data_isset = true;
}
memcpy(buf, prefetch_buffer->data.get() + location - prefetch_buffer->location, len);
} else {
// 如果没有预读数据,直接从文件中读取
if (prefetch_mode && len < PREFETCH_FALLBACK_BUFFERSIZE && len > 0) {
Prefetch(location, MinValue(handle.GetFileSize() - location, PREFETCH_FALLBACK_BUFFERSIZE));
auto prefetch_buffer_fallback = ra_buffer.GetReadHead(location);
memcpy(buf, prefetch_buffer_fallback->data.get() + location - prefetch_buffer_fallback->location, len);
} else {
handle.Read(buf, len, location);
}
}
location += len;
return len;
}

private:
FileHandle &handle;
idx_t location;
Allocator &allocator;
ReadAheadBuffer ra_buffer;
bool prefetch_mode;
}

ThriftFileTransport重载read方法,优先使用预读数据块,如果数据块未加载则实时读取,并且提供方法Prefetch、RegisterPrefetch、FinalizeRegistration和PrefetchRegistered,实现预读策略:

  1. 先注册所有需要预读的范围;
  2. 清理合并操作以固定范围;
  3. 执行预读操作将数据加载到内存。

不同预取策略:

策略类型 触发条件 优势 实现位置
主动批量预取 已知读取模式时 最大化顺序读取性能 ReadAheadBuffer::Prefetch
被动按需预取 未预取或预取未覆盖时 避免小数据预取开销 ThriftFileTransport::read

读取流程图

graph TD
    A[打开文件] --> B{加密检查}
    B -- 是 --> C[解密元数据]
    B -- 否 --> D[直接读取元数据]
    C --> E[解析元数据]
    D --> E
    E --> F[构建Schema树]
    F --> G[递归创建列读取器]
    G --> H[Struct/List列处理]
    G --> I[基本类型列处理]
    
    subgraph 扫描循环
        J[选择行组] --> K{行组过滤?}
        K -- 跳过 --> M[下一行组]
        K -- 读取 --> L[初始化行组读取]
        L --> N[预取数据]
        N --> O[列并行读取]
        O --> P[应用谓词下推]
        P --> Q[向量化过滤]
        Q --> R[输出DataChunk]
        R --> S{更多行组?}
        S -- 是 --> J
        S -- 否 --> T[结束扫描]
    end
    
    E --> J
    I --> O
    
    style A fill:#f9f,stroke:#333
    style E fill:#bbf,stroke:#333
    style G fill:#9f9,stroke:#333
    style J fill:#f96,stroke:#333
    style O fill:#6f9,stroke:#333

总结

DuckDB 读取 Parquet 文件的设计通过文件系统接口打开文件,借助 Thrift 协议解析元数据,构建列读取器树并初始化 Schema。在扫描过程中,通过行组过滤、列并行读取和谓词下推等优化策略,高效地预取数据并将其转化为向量化数据块,同时利用元数据缓存、延迟加载和内存预分配等机制提升性能,实现对 Parquet 文件的快速读取和处理。设计充分考虑了现代硬件特性,通过向量化处理、数据局部性优化和多级并行,实现了高效的列式数据读取。同时,灵活的过滤机制使得在复杂查询场景下能显著减少不必要的IO和计算开销。

DuckDB-源码分析(2)Parquet读设计

https://devillove084.github.io/2025/02/16/DuckDB-2/

作者

devillove084

发布于

2025-02-16

更新于

2025-02-16

许可协议

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×