在ParquetReader的构造函数中,会打开文件句柄,加载元数据,并初始化。Scan函数负责实际的扫描过程,处理行组,应用过滤器,读取数据到DataChunk中。首先,在元数据加载部分,parquet文件的元数据通常位于文件末尾,包含schema、行组信息、统计信息等。LoadMetadata函数通过读取文件末尾的元数据部分,解析出FileMetaData结构。
DeriveLogicalType函数根据Parquet的SchemaElement中的类型信息(如Type、converted_type、logicalType)转换为DuckDB的LogicalType。例如,Parquet的INT32可能对应DuckDB的INTEGER,而带有converted_type为DATE的INT32会被转换为DATE类型。这里还处理了时间戳、UUID等复杂类型的转换。
创建列读取器的过程在CreateReaderRecursive中完成,这个函数递归地遍历SchemaElement树,根据每个元素的类型和子元素创建相应的ColumnReader。例如,遇到STRUCT类型时,会创建StructColumnReader,并为每个子列创建对应的读取器,对于重复的字段(如LIST或MAP),会使用ListColumnReader来处理嵌套结构。
初始化时,InitializeSchema调用CreateReader创建根读取器,通常是StructColumnReader,因为它对应Parquet文件的根结构。然后根据读取到的列信息填充columns向量,记录每个列的名称和类型。
扫描数据的过程由Scan函数处理。ParquetReaderScanState用于跟踪扫描的状态,包括当前处理的行组、文件句柄、过滤器等。PrepareRowGroupBuffer函数准备当前行组的数据,可能应用统计信息进行过滤,跳过不需要读取的行组。然后通过ColumnReader的Read方法将数据读取到DataChunk中,应用过滤器(如果有的话),最终将结果返回。
过滤器处理部分,ApplyFilter函数根据过滤条件对读取的数据进行过滤。例如,等值比较、范围比较、IS NULL等条件会被应用到数据上,减少需要处理的数据量,提升查询性能。
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ParquetReader::ParquetReader(ClientContext &context, string file_name, ParquetOptions parquet_options, shared_ptr<ParquetFileMetadataCache> metadata) : fs(FileSystem::GetFileSystem(context)), allocator(BufferAllocator::Get(context)), parquet_options(std::move(parquet_options)) { file_handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ); if (!metadata) { this->metadata = LoadMetadata(context, allocator, *file_handle, parquet_options.encryption_config, *encryption_util); } else { this->metadata = std::move(metadata); } InitializeSchema(context); }
|
关键步骤:
- 文件打开:使用DuckDB统一文件系统接口
- 元数据加载:
- 优先使用传入的缓存元数据;
- 无缓存时调用LoadMetadata解析
- Schema初始化:构建列读取器的树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| shared_ptr<ParquetFileMetadataCache> LoadMetadata(...) { auto file_proto = CreateThriftFileProtocol(allocator, file_handle, false); transport.read(buf.ptr, 8); if (memcmp(buf.ptr+4, "PAR1",4)==0) { footer_encrypted = false; } else if (memcmp(..., "PARE", ...)) { footer_encrypted = true; ParquetCrypto::Read(...); } auto metadata = make_uniq<FileMetaData>(); metadata->read(file_proto.get()); auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context); return make_shared_ptr<ParquetFileMetadataCache>(...); }
|
元数据结构:
1 2 3 4 5
| struct ParquetFileMetadataCache { unique_ptr<FileMetaData> metadata; time_t read_time; unique_ptr<GeoParquetFileMetadata> geo_metadata; };
|
Schema 初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void ParquetReader::InitializeSchema(ClientContext &context) { auto file_meta_data = GetFileMetadata(); root_reader = CreateReader(context); auto &struct_reader = root_reader->Cast<StructColumnReader>(); for (idx_t i = 0; i < child_types.size(); i++) { columns.emplace_back( MultiFileReaderColumnDefinition( child_types[i].first, child_types[i].second ) ); } if (parquet_options.file_row_number) { columns.emplace_back("file_row_number", LogicalType::BIGINT); } }
|
结构说明
关键数据结构说明
- FileMetaData 元数据类结构
1 2 3 4 5 6 7 8 9
| struct FileMetaData { 1: required i32 version 2: required list<SchemaElement> schema 3: required i64 num_rows 4: required list<RowGroup> row_groups 5: optional list<KeyValue> key_value_metadata 6: optional string created_by 7: optional list<ColumnOrder> column_orders }
|
- ColumnReader 类层次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class ColumnReader { public: virtual void Read(...) = 0; virtual void Skip(...) = 0; virtual unique_ptr<BaseStatistics> Stats(...) = 0; };
class StructColumnReader : public ColumnReader { vector<unique_ptr<ColumnReader>> child_readers; };
class ListColumnReader : public ColumnReader { unique_ptr<ColumnReader> child_reader; };
template <class T> class TemplatedColumnReader : public ColumnReader { };
|
初始化阶段性能优化
- 元数据缓存
1 2 3 4 5
| metadata = ObjectCache::Get(file_name); if (!metadata || expired) { metadata = LoadMetadata(...); ObjectCache::Put(file_name, metadata); }
|
- 延迟加载
1 2 3 4 5 6
| void StructColumnReader::InitializeRead(...) { for (auto &child : child_readers) { if (child) child->InitializeRead(...); } }
|
- 内存预分配
1 2
| ResizeableBuffer define_buf; define_buf.resize(allocator, STANDARD_VECTOR_SIZE);
|
初始化时序图
扫描数据过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector<idx_t> groups_to_read) { state.current_group = -1; state.group_offset = 0; state.finished = false; state.file_handle = fs.OpenFile(file_name, FileFlags::FILE_FLAGS_READ | (prefetch ? FILE_FLAGS_DIRECT_IO : 0)); state.thrift_file_proto = CreateThriftFileProtocol(allocator, *state.file_handle, state.prefetch_mode); state.root_reader = CreateReader(context); state.define_buf.resize(allocator, STANDARD_VECTOR_SIZE); state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE); Value prefetch_all; context.TryGetCurrentSetting("prefetch_all_parquet_files", prefetch_all); state.prefetch_mode = prefetch_all.GetValue<bool>(); }
|
关键点:
- 每个扫描状态维护独立的文件句柄和Thrift解析器,支持并发扫描
- 根据配置选择直接IO模式(减少内核缓存开销)
- 列读取器树可能根据谓词进行剪枝(减少不必要的列读取)
RowGroup处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t out_col_idx) { auto &group = GetGroup(state); auto column_id = reader_data.column_ids[out_col_idx]; auto &column_reader = state.root_reader->Cast<StructColumnReader>().GetChildReader(column_id);
auto stats = column_reader.Stats(state.current_group, group.columns); if (stats && reader_data.filters) { auto global_id = reader_data.column_mapping[out_col_idx]; auto filter = reader_data.filters->filters[global_id]; if (stats->CheckFilter(*filter) == FilterPropagateResult::FILTER_ALWAYS_FALSE) { state.group_offset = group.num_rows; return; } }
column_reader.InitializeRead(state.current_group, group.columns, *state.thrift_file_proto); }
|
过滤逻辑:
- 使用行组的min/max值快速判断是否跳过;
- 对字符串列使用更精确的字典过滤;
ScanInternal 函数
ScanInternal函数负责从Parquet文件中读取数据到DataChunk中,处理行组切换、预取、数据解码和过滤等步骤。函数开始检查是否完成扫描,然后处理行组切换。在切换行组时,会进行预取操作,包括整个行组的预取或按列预取。之后,函数计算需要读取的行数,初始化过滤掩码,然后按列读取数据,应用过滤器,最后更新状态。在预取部分,代码根据是否启用预取模式(prefetch_mode)以及扫描的列比例(scan_percentage)来决定是预取整个行组还是按列预取。
- 行组切换与预取处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| if (state.current_group < 0 || (int64_t)state.group_offset >= GetGroup(state).num_rows) { state.current_group++; state.group_offset = 0;
auto &trans = reinterpret_cast<ThriftFileTransport &>(*state.thrift_file_proto->getTransport()); trans.ClearPrefetch();
if ((idx_t)state.current_group == state.group_idx_list.size()) { state.finished = true; return false; }
uint64_t to_scan_compressed_bytes = 0; for (所有列) { PrepareRowGroupBuffer(...); to_scan_compressed_bytes += 列压缩大小; }
if (state.prefetch_mode) { double scan_percentage = 扫描字节数 / 行组总跨度; if (scan_percentage > 0.95) { trans.Prefetch(起始偏移, 行组总跨度); } else { for (所有列) { if (无过滤 || 列有过滤条件) { 注册列预取; } } trans.PrefetchRegistered(); } } }
|
预取策略:
条件 |
策略 |
适用场景 |
scan_percentage > 95% |
预取整个行组 |
全表扫描 |
存在过滤器 |
仅预取过滤列 |
条件查询 |
无过滤器 |
预取所有列 |
投影查询 |
- 读取逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| idx_t this_output_chunk_rows = Min(STANDARD_VECTOR_SIZE, GetGroup().num_rows - state.group_offset); result.SetCardinality(this_output_chunk_rows);
parquet_filter_t filter_mask; filter_mask.set(); for (i >= this_output_chunk_rows) filter_mask.reset(i);
if (存在过滤器) { for (每个过滤条件) { if (过滤列是常量) { ApplyFilter(常量向量, 过滤条件); } else { 读取过滤列数据到result_vector; ApplyFilter(result_vector, 过滤条件); } }
for (剩余列) { if (过滤结果全无效) { Skip(); } else { 读取列数据到result_vector; } }
SelectionVector sel; for (i=0 到 this_output_chunk_rows) { if (filter_mask.test(i)) sel.append(i); } result.Slice(sel); } else { for (所有列) { 读取列数据到result_vector; } }
state.group_offset += this_output_chunk_rows;
|
ThriftFileTransport 数据结构
1 2 3 4 5 6 7 8 9
| struct ReadHead { ReadHead(idx_t location, uint64_t size) : location(location), size(size) {}; idx_t location; uint64_t size; AllocatedData data; bool data_isset = false; idx_t GetEnd() const { return size + location; } void Allocate(Allocator &allocator) { data = allocator.Allocate(size); } };
|
ReadHead表示一个数据块的基本信息,包括起始位置、大小、存储空间等信息。
初始化时传入起始位置和大小,并且提供获取数据块结束位置的方法,以及提供内存分配方法,将数据加载到data缓冲区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| struct ReadHeadComparator { static constexpr uint64_t ALLOW_GAP = 1 << 14; bool operator()(const ReadHead *a, const ReadHead *b) const { auto a_start = a->location; auto a_end = a->location + a->size; auto b_start = b->location;
if (a_end <= NumericLimits<idx_t>::Maximum() - ALLOW_GAP) { a_end += ALLOW_GAP; }
return a_start < b_start && a_end < b_start; } };
|
ReadHeadComparator用于比较两个ReadHead对象的位置关系,判断是否允许合并或重叠。具体的,类中定义了一个允许的隔距ALLOW_GAP(16KiB),用来比较两个数据块的起始位置和结束位置:如果a的结束位置(加上允许隔距)在b的起始位置之前,则认为两者不重叠。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| struct ReadAheadBuffer { ReadAheadBuffer(Allocator &allocator, FileHandle &handle) : allocator(allocator), handle(handle) {} std::list<ReadHead> read_heads; std::set<ReadHead *, ReadHeadComparator> merge_set;
Allocator &allocator; FileHandle &handle;
idx_t total_size = 0;
void AddReadHead(idx_t pos, uint64_t len, bool merge_buffers = true) { if (merge_buffers) { ReadHead new_read_head {pos, len}; auto lookup_set = merge_set.find(&new_read_head); if (lookup_set != merge_set.end()) { auto existing_head = *lookup_set; auto new_start = MinValue(existing_head->location, new_read_head.location); auto new_length = MaxValue(existing_head->GetEnd(), new_read_head.GetEnd()) - new_start; existing_head->location = new_start; existing_head->size = new_length; return; } }
read_heads.emplace_front(ReadHead(pos, len)); total_size += len; auto &read_head = read_heads.front();
if (merge_buffers) { merge_set.insert(&read_head); }
if (read_head.GetEnd() > handle.GetFileSize()) { throw std::runtime_error("Prefetch registered for bytes outside file"); } }
ReadHead *GetReadHead(idx_t pos) { for (auto &read_head : read_heads) { if (pos >= read_head.location && pos < read_head.GetEnd()) { return &read_head; } } return nullptr; }
void Prefetch() { for (auto &read_head : read_heads) { read_head.Allocate(allocator);
if (read_head.GetEnd() > handle.GetFileSize()) { throw std::runtime_error("Prefetch requested for bytes outside file"); }
handle.Read(read_head.data.get(), read_head.size, read_head.location); read_head.data_isset = true; } } };
|
ReadAheadBuffer 结构用来管理预读数据块的队列和合并操作,list存储所有未读取的数据块,set配合ReadHeadComparator合并相邻或重叠的数据块,减少存储和合并IO,具体的:
- AddReadHead添加预读数据块,并尝试与现有数据块合并;
- GetReadHead根据给定位置返回对应的数据块;
- Prefetch将所有预读数据块从文件中读取并加载到内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| class ThriftFileTransport { uint32_t read(uint8_t *buf, uint32_t len) { auto prefetch_buffer = ra_buffer.GetReadHead(location); if (prefetch_buffer != nullptr && location - prefetch_buffer->location + len <= prefetch_buffer->size) { if (!prefetch_buffer->data_isset) { prefetch_buffer->Allocate(allocator); handle.Read(prefetch_buffer->data.get(), prefetch_buffer->size, prefetch_buffer->location); prefetch_buffer->data_isset = true; } memcpy(buf, prefetch_buffer->data.get() + location - prefetch_buffer->location, len); } else { if (prefetch_mode && len < PREFETCH_FALLBACK_BUFFERSIZE && len > 0) { Prefetch(location, MinValue(handle.GetFileSize() - location, PREFETCH_FALLBACK_BUFFERSIZE)); auto prefetch_buffer_fallback = ra_buffer.GetReadHead(location); memcpy(buf, prefetch_buffer_fallback->data.get() + location - prefetch_buffer_fallback->location, len); } else { handle.Read(buf, len, location); } } location += len; return len; }
private: FileHandle &handle; idx_t location; Allocator &allocator; ReadAheadBuffer ra_buffer; bool prefetch_mode; }
|
ThriftFileTransport重载read方法,优先使用预读数据块,如果数据块未加载则实时读取,并且提供方法Prefetch、RegisterPrefetch、FinalizeRegistration和PrefetchRegistered,实现预读策略:
- 先注册所有需要预读的范围;
- 清理合并操作以固定范围;
- 执行预读操作将数据加载到内存。
不同预取策略:
策略类型 |
触发条件 |
优势 |
实现位置 |
主动批量预取 |
已知读取模式时 |
最大化顺序读取性能 |
ReadAheadBuffer::Prefetch |
被动按需预取 |
未预取或预取未覆盖时 |
避免小数据预取开销 |
ThriftFileTransport::read |
读取流程图
总结
DuckDB 读取 Parquet 文件的设计通过文件系统接口打开文件,借助 Thrift 协议解析元数据,构建列读取器树并初始化 Schema。在扫描过程中,通过行组过滤、列并行读取和谓词下推等优化策略,高效地预取数据并将其转化为向量化数据块,同时利用元数据缓存、延迟加载和内存预分配等机制提升性能,实现对 Parquet 文件的快速读取和处理。设计充分考虑了现代硬件特性,通过向量化处理、数据局部性优化和多级并行,实现了高效的列式数据读取。同时,灵活的过滤机制使得在复杂查询场景下能显著减少不必要的IO和计算开销。