DuckDB-源码分析(2)Parquet读设计

在ParquetReader的构造函数中,会打开文件句柄,加载元数据,并初始化。Scan函数负责实际的扫描过程,处理行组,应用过滤器,读取数据到DataChunk中。首先,在元数据加载部分,parquet文件的元数据通常位于文件末尾,包含schema、行组信息、统计信息等。LoadMetadata函数通过读取文件末尾的元数据部分,解析出FileMetaData结构。

DeriveLogicalType函数根据Parquet的SchemaElement中的类型信息(如Type、converted_type、logicalType)转换为DuckDB的LogicalType。例如,Parquet的INT32可能对应DuckDB的INTEGER,而带有converted_type为DATE的INT32会被转换为DATE类型。这里还处理了时间戳、UUID等复杂类型的转换。

创建列读取器的过程在CreateReaderRecursive中完成,这个函数递归地遍历SchemaElement树,根据每个元素的类型和子元素创建相应的ColumnReader。例如,遇到STRUCT类型时,会创建StructColumnReader,并为每个子列创建对应的读取器,对于重复的字段(如LIST或MAP),会使用ListColumnReader来处理嵌套结构。

初始化时,InitializeSchema调用CreateReader创建根读取器,通常是StructColumnReader,因为它对应Parquet文件的根结构。然后根据读取到的列信息填充columns向量,记录每个列的名称和类型。

扫描数据的过程由Scan函数处理。ParquetReaderScanState用于跟踪扫描的状态,包括当前处理的行组、文件句柄、过滤器等。PrepareRowGroupBuffer函数准备当前行组的数据,可能应用统计信息进行过滤,跳过不需要读取的行组。然后通过ColumnReader的Read方法将数据读取到DataChunk中,应用过滤器(如果有的话),最终将结果返回。

过滤器处理部分,ApplyFilter函数根据过滤条件对读取的数据进行过滤。例如,等值比较、范围比较、IS NULL等条件会被应用到数据上,减少需要处理的数据量,提升查询性能。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ParquetReader::ParquetReader(ClientContext &context, string file_name, 
ParquetOptions parquet_options,
shared_ptr<ParquetFileMetadataCache> metadata)
: fs(FileSystem::GetFileSystem(context)),
allocator(BufferAllocator::Get(context)),
parquet_options(std::move(parquet_options)) {

// 1. 打开文件
file_handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ);

// 2. 加载/获取元数据缓存
if (!metadata) {
this->metadata = LoadMetadata(context, allocator, *file_handle,
parquet_options.encryption_config,
*encryption_util);
} else {
this->metadata = std::move(metadata);
}

// 3. 初始化Schema
InitializeSchema(context);
}

关键步骤:

  1. 文件打开:使用DuckDB统一文件系统接口
  2. 元数据加载:
    • 优先使用传入的缓存元数据;
    • 无缓存时调用LoadMetadata解析
  3. Schema初始化:构建列读取器的树

MetaData加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
shared_ptr<ParquetFileMetadataCache> LoadMetadata(...) {
// 创建Thrift协议解析器
auto file_proto = CreateThriftFileProtocol(allocator, file_handle, false);

// 读取文件尾部8字节
transport.read(buf.ptr, 8);
if (memcmp(buf.ptr+4, "PAR1",4)==0) {
footer_encrypted = false; // 标准Parquet文件
} else if (memcmp(..., "PARE", ...)) {
footer_encrypted = true; // 加密文件
ParquetCrypto::Read(...); // AES-GCM解密
}

// 反序列化元数据
auto metadata = make_uniq<FileMetaData>();
metadata->read(file_proto.get());

// 处理GeoParquet扩展
auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context);

return make_shared_ptr<ParquetFileMetadataCache>(...);
}

元数据结构:

1
2
3
4
5
struct ParquetFileMetadataCache {
unique_ptr<FileMetaData> metadata; // Thrift生成的元数据
time_t read_time; // 缓存时间戳
unique_ptr<GeoParquetFileMetadata> geo_metadata; // 地理扩展
};

Schema 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ParquetReader::InitializeSchema(ClientContext &context) {
auto file_meta_data = GetFileMetadata();

// 1. 创建根读取器
root_reader = CreateReader(context);

// 2. 构建列定义
auto &struct_reader = root_reader->Cast<StructColumnReader>();
for (idx_t i = 0; i < child_types.size(); i++) {
columns.emplace_back(
MultiFileReaderColumnDefinition(
child_types[i].first, // 列名
child_types[i].second // 逻辑类型
)
);
}

// 3. 处理生成列(如file_row_number)
if (parquet_options.file_row_number) {
columns.emplace_back("file_row_number", LogicalType::BIGINT);
}
}

结构说明

Reader

Metadata

read

ParquetReader

FileHandle

ParquetFileMetadataCache

FileMetaData

GeoParquetMetadata

ColumnReader树

SchemaElements

RowGroups

ColumnChunks

Statistics

EncryptionInfo

StructColumnReader

ColumnReader...

ListColumnReader

TemplatedColumnReader

Parquet file

关键数据结构说明

  1. FileMetaData 元数据类结构
1
2
3
4
5
6
7
8
9
struct FileMetaData {
1: required i32 version
2: required list<SchemaElement> schema
3: required i64 num_rows
4: required list<RowGroup> row_groups
5: optional list<KeyValue> key_value_metadata
6: optional string created_by
7: optional list<ColumnOrder> column_orders
}
  1. ColumnReader 类层次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ColumnReader {
public:
virtual void Read(...) = 0;
virtual void Skip(...) = 0;
virtual unique_ptr<BaseStatistics> Stats(...) = 0;
};

class StructColumnReader : public ColumnReader {
vector<unique_ptr<ColumnReader>> child_readers;
};

class ListColumnReader : public ColumnReader {
unique_ptr<ColumnReader> child_reader;
};

template <class T>
class TemplatedColumnReader : public ColumnReader {
// 物理类型特化实现
};

初始化阶段性能优化

  1. 元数据缓存
1
2
3
4
5
metadata = ObjectCache::Get(file_name);
if (!metadata || expired) {
metadata = LoadMetadata(...);
ObjectCache::Put(file_name, metadata);
}
  1. 延迟加载
1
2
3
4
5
6
void StructColumnReader::InitializeRead(...) {
// 按需初始化子读取器
for (auto &child : child_readers) {
if (child) child->InitializeRead(...);
}
}
  1. 内存预分配
1
2
ResizeableBuffer define_buf;
define_buf.resize(allocator, STANDARD_VECTOR_SIZE); // 2048

初始化时序图

ObjectCacheFileHandleColumnReaderThriftParserFileSystemParquetReaderClientObjectCacheFileHandleColumnReaderThriftParserFileSystemParquetReaderClientalt[non-cache][cache]loop[create]Create Reader(file_name)OpenFile()Return FileHandleLoadMetadata()Read file footerFileMetaDataGetMetadata()CreateReaderRecursive()Create sub readerInit done

扫描数据过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ParquetReader::InitializeScan(ClientContext &context, 
ParquetReaderScanState &state,
vector<idx_t> groups_to_read) {
// 重置扫描状态
state.current_group = -1;
state.group_offset = 0;
state.finished = false;

// 独立文件句柄(支持多线程)
state.file_handle = fs.OpenFile(file_name,
FileFlags::FILE_FLAGS_READ | (prefetch ? FILE_FLAGS_DIRECT_IO : 0));

// 创建Thrift协议解析器(每个扫描状态独立)
state.thrift_file_proto = CreateThriftFileProtocol(allocator, *state.file_handle, state.prefetch_mode);

// 创建列读取器树(可能带谓词过滤)
state.root_reader = CreateReader(context);

// 初始化缓冲区
state.define_buf.resize(allocator, STANDARD_VECTOR_SIZE);
state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE);

// 配置预取模式
Value prefetch_all;
context.TryGetCurrentSetting("prefetch_all_parquet_files", prefetch_all);
state.prefetch_mode = prefetch_all.GetValue<bool>();
}

关键点:

  • 每个扫描状态维护独立的文件句柄和Thrift解析器,支持并发扫描
  • 根据配置选择直接IO模式(减少内核缓存开销)
  • 列读取器树可能根据谓词进行剪枝(减少不必要的列读取)

RowGroup处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t out_col_idx) {
auto &group = GetGroup(state);
auto column_id = reader_data.column_ids[out_col_idx];
auto &column_reader = state.root_reader->Cast<StructColumnReader>().GetChildReader(column_id);

// 1. 检查行组统计信息
auto stats = column_reader.Stats(state.current_group, group.columns);
if (stats && reader_data.filters) {
auto global_id = reader_data.column_mapping[out_col_idx];
auto filter = reader_data.filters->filters[global_id];

// 2. 应用统计过滤
if (stats->CheckFilter(*filter) == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
state.group_offset = group.num_rows; // 跳过整个行组
return;
}
}

// 3. 初始化该RowGroup列读取器
column_reader.InitializeRead(state.current_group, group.columns, *state.thrift_file_proto);
}

过滤逻辑:

  • 使用行组的min/max值快速判断是否跳过;
  • 对字符串列使用更精确的字典过滤;

ScanInternal 函数

ScanInternal函数负责从Parquet文件中读取数据到DataChunk中,处理行组切换、预取、数据解码和过滤等步骤。函数开始检查是否完成扫描,然后处理行组切换。在切换行组时,会进行预取操作,包括整个行组的预取或按列预取。之后,函数计算需要读取的行数,初始化过滤掩码,然后按列读取数据,应用过滤器,最后更新状态。在预取部分,代码根据是否启用预取模式(prefetch_mode)以及扫描的列比例(scan_percentage)来决定是预取整个行组还是按列预取。

  1. 行组切换与预取处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
if (state.current_group < 0 || (int64_t)state.group_offset >= GetGroup(state).num_rows) {
// 切换到下一个行组
state.current_group++;
state.group_offset = 0;

auto &trans = reinterpret_cast<ThriftFileTransport &>(*state.thrift_file_proto->getTransport());
trans.ClearPrefetch(); // 清空之前的预取

// 检查行组边界
if ((idx_t)state.current_group == state.group_idx_list.size()) {
state.finished = true;
return false;
}

// 计算需要扫描的压缩字节数
uint64_t to_scan_compressed_bytes = 0;
for (所有列) {
PrepareRowGroupBuffer(...); // 准备列读取器
to_scan_compressed_bytes += 列压缩大小;
}

// 预取决策逻辑
if (state.prefetch_mode) {
double scan_percentage = 扫描字节数 / 行组总跨度;

if (scan_percentage > 0.95) {
// 全行组预取
trans.Prefetch(起始偏移, 行组总跨度);
} else {
// 按列预取
for (所有列) {
if (无过滤 || 列有过滤条件) {
注册列预取;
}
}
trans.PrefetchRegistered(); // 触发预取
}
}
}

预取策略:

条件 策略 适用场景
scan_percentage > 95% 预取整个行组 全表扫描
存在过滤器 仅预取过滤列 条件查询
无过滤器 预取所有列 投影查询
  1. 读取逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 计算本次读取行数
idx_t this_output_chunk_rows = Min(STANDARD_VECTOR_SIZE,
GetGroup().num_rows - state.group_offset);
result.SetCardinality(this_output_chunk_rows);

// 初始化过滤掩码
parquet_filter_t filter_mask;
filter_mask.set(); // 初始全有效
for (i >= this_output_chunk_rows)
filter_mask.reset(i); // 屏蔽超出部分

// 列处理流程
if (存在过滤器) {
// 第一阶段:处理过滤列
for (每个过滤条件) {
if (过滤列是常量) {
ApplyFilter(常量向量, 过滤条件);
} else {
读取过滤列数据到result_vector;
ApplyFilter(result_vector, 过滤条件);
}
}

// 第二阶段:处理其他列
for (剩余列) {
if (过滤结果全无效) {
Skip(); // 跳过解码
} else {
读取列数据到result_vector;
}
}

// 生成选择向量
SelectionVector sel;
for (i=0 到 this_output_chunk_rows) {
if (filter_mask.test(i))
sel.append(i);
}
result.Slice(sel); // 压缩结果
} else {
// 无过滤直接读取
for (所有列) {
读取列数据到result_vector;
}
}

// 更新偏移量
state.group_offset += this_output_chunk_rows;

ThriftFileTransport 数据结构

1
2
3
4
5
6
7
8
9
struct ReadHead {
ReadHead(idx_t location, uint64_t size) : location(location), size(size) {};
idx_t location; // 数据块起始位置
uint64_t size; // 数据块大小
AllocatedData data; // 分配的存储空间
bool data_isset = false; // 数据是否已加载标记
idx_t GetEnd() const { return size + location; } // 获取数据块结束位置
void Allocate(Allocator &allocator) { data = allocator.Allocate(size); } // 分配内存空间
};

ReadHead表示一个数据块的基本信息,包括起始位置、大小、存储空间等信息。
初始化时传入起始位置和大小,并且提供获取数据块结束位置的方法,以及提供内存分配方法,将数据加载到data缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct ReadHeadComparator {
static constexpr uint64_t ALLOW_GAP = 1 << 14; // 16 KiB
bool operator()(const ReadHead *a, const ReadHead *b) const {
auto a_start = a->location;
auto a_end = a->location + a->size;
auto b_start = b->location;

if (a_end <= NumericLimits<idx_t>::Maximum() - ALLOW_GAP) {
a_end += ALLOW_GAP;
}

return a_start < b_start && a_end < b_start;
}
};

ReadHeadComparator用于比较两个ReadHead对象的位置关系,判断是否允许合并或重叠。具体的,类中定义了一个允许的隔距ALLOW_GAP(16KiB),用来比较两个数据块的起始位置和结束位置:如果a的结束位置(加上允许隔距)在b的起始位置之前,则认为两者不重叠。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
struct ReadAheadBuffer {
ReadAheadBuffer(Allocator &allocator, FileHandle &handle) : allocator(allocator), handle(handle) {}

std::list<ReadHead> read_heads; // 存储所有预读数据块
std::set<ReadHead *, ReadHeadComparator> merge_set; // 用于合并数据块

Allocator &allocator; // 内存分配器
FileHandle &handle; // 文件句柄

idx_t total_size = 0; // 总预读大小

// 添加一个预读数据块
void AddReadHead(idx_t pos, uint64_t len, bool merge_buffers = true) {
// 尝试合并现有数据块
if (merge_buffers) {
ReadHead new_read_head {pos, len};
auto lookup_set = merge_set.find(&new_read_head);
if (lookup_set != merge_set.end()) {
auto existing_head = *lookup_set;
auto new_start = MinValue(existing_head->location, new_read_head.location);
auto new_length = MaxValue(existing_head->GetEnd(), new_read_head.GetEnd()) - new_start;
existing_head->location = new_start;
existing_head->size = new_length;
return;
}
}

read_heads.emplace_front(ReadHead(pos, len));
total_size += len;
auto &read_head = read_heads.front();

if (merge_buffers) {
merge_set.insert(&read_head);
}

// 检查数据块是否超出文件范围
if (read_head.GetEnd() > handle.GetFileSize()) {
throw std::runtime_error("Prefetch registered for bytes outside file");
}
}

// 获取相关数据块
ReadHead *GetReadHead(idx_t pos) {
for (auto &read_head : read_heads) {
if (pos >= read_head.location && pos < read_head.GetEnd()) {
return &read_head;
}
}
return nullptr;
}

// 预读所有数据块
void Prefetch() {
for (auto &read_head : read_heads) {
read_head.Allocate(allocator);

if (read_head.GetEnd() > handle.GetFileSize()) {
throw std::runtime_error("Prefetch requested for bytes outside file");
}

handle.Read(read_head.data.get(), read_head.size, read_head.location);
read_head.data_isset = true;
}
}
};

ReadAheadBuffer 结构用来管理预读数据块的队列和合并操作,list存储所有未读取的数据块,set配合ReadHeadComparator合并相邻或重叠的数据块,减少存储和合并IO,具体的:

  • AddReadHead添加预读数据块,并尝试与现有数据块合并;
  • GetReadHead根据给定位置返回对应的数据块;
  • Prefetch将所有预读数据块从文件中读取并加载到内存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ThriftFileTransport {
// void Prefetch(idx_t pos, uint64_t len) { RegisterPrefetch(pos, len, false); FinalizeRegistration(); PrefetchRegistered(); }
// void RegisterPrefetch(idx_t pos, uint64_t len, bool can_merge = true) { ra_buffer.AddReadHead(pos, len, can_merge); }
// void FinalizeRegistration() { ra_buffer.merge_set.clear(); }
// void PrefetchRegistered() { ra_buffer.Prefetch(); }
// void ClearPrefetch() { ra_buffer.read_heads.clear(); ra_buffer.merge_set.clear(); }
uint32_t read(uint8_t *buf, uint32_t len) {
auto prefetch_buffer = ra_buffer.GetReadHead(location);
if (prefetch_buffer != nullptr && location - prefetch_buffer->location + len <= prefetch_buffer->size) {
// 如果有预读数据,直接使用
if (!prefetch_buffer->data_isset) {
prefetch_buffer->Allocate(allocator);
handle.Read(prefetch_buffer->data.get(), prefetch_buffer->size, prefetch_buffer->location);
prefetch_buffer->data_isset = true;
}
memcpy(buf, prefetch_buffer->data.get() + location - prefetch_buffer->location, len);
} else {
// 如果没有预读数据,直接从文件中读取
if (prefetch_mode && len < PREFETCH_FALLBACK_BUFFERSIZE && len > 0) {
Prefetch(location, MinValue(handle.GetFileSize() - location, PREFETCH_FALLBACK_BUFFERSIZE));
auto prefetch_buffer_fallback = ra_buffer.GetReadHead(location);
memcpy(buf, prefetch_buffer_fallback->data.get() + location - prefetch_buffer_fallback->location, len);
} else {
handle.Read(buf, len, location);
}
}
location += len;
return len;
}

private:
FileHandle &handle;
idx_t location;
Allocator &allocator;
ReadAheadBuffer ra_buffer;
bool prefetch_mode;
}

ThriftFileTransport重载read方法,优先使用预读数据块,如果数据块未加载则实时读取,并且提供方法Prefetch、RegisterPrefetch、FinalizeRegistration和PrefetchRegistered,实现预读策略:

  1. 先注册所有需要预读的范围;
  2. 清理合并操作以固定范围;
  3. 执行预读操作将数据加载到内存。

不同预取策略:

策略类型 触发条件 优势 实现位置
主动批量预取 已知读取模式时 最大化顺序读取性能 ReadAheadBuffer::Prefetch
被动按需预取 未预取或预取未覆盖时 避免小数据预取开销 ThriftFileTransport::read

读取流程图

scan loop

Y

N

Jump

Read

Y

N

Open file

Crypt check

Decrypt metadata

Read metadata

Decode metadata

Create schema tree

Create column reader

Struct/List column process

Basic type column process

Choose row group

Row group filter?

Next row group

Init row group reader

Prefetch

Column read

Apply filter

Vector filter column

Output DataChunk

More row group?

Scan done

总结

DuckDB 读取 Parquet 文件的设计通过文件系统接口打开文件,借助 Thrift 协议解析元数据,构建列读取器树并初始化 Schema。在扫描过程中,通过行组过滤、列并行读取和谓词下推等优化策略,高效地预取数据并将其转化为向量化数据块,同时利用元数据缓存、延迟加载和内存预分配等机制提升性能,实现对 Parquet 文件的快速读取和处理。设计充分考虑了现代硬件特性,通过向量化处理、数据局部性优化和多级并行,实现了高效的列式数据读取。同时,灵活的过滤机制使得在复杂查询场景下能显著减少不必要的IO和计算开销。

DuckDB-源码分析(2)Parquet读设计

https://devillove084.github.io/2025/02/16/DuckDB-2/

作者

devillove084

发布于

2025-02-16

更新于

2025-03-17

许可协议

评论

Powered By Valine
v1.4.16
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×