在ParquetReader的构造函数中,会打开文件句柄,加载元数据,并初始化。Scan函数负责实际的扫描过程,处理行组,应用过滤器,读取数据到DataChunk中。首先,在元数据加载部分,parquet文件的元数据通常位于文件末尾,包含schema、行组信息、统计信息等。LoadMetadata函数通过读取文件末尾的元数据部分,解析出FileMetaData结构。
DeriveLogicalType函数根据Parquet的SchemaElement中的类型信息(如Type、converted_type、logicalType)转换为DuckDB的LogicalType。例如,Parquet的INT32可能对应DuckDB的INTEGER,而带有converted_type为DATE的INT32会被转换为DATE类型。这里还处理了时间戳、UUID等复杂类型的转换。
创建列读取器的过程在CreateReaderRecursive中完成,这个函数递归地遍历SchemaElement树,根据每个元素的类型和子元素创建相应的ColumnReader。例如,遇到STRUCT类型时,会创建StructColumnReader,并为每个子列创建对应的读取器,对于重复的字段(如LIST或MAP),会使用ListColumnReader来处理嵌套结构。
初始化时,InitializeSchema调用CreateReader创建根读取器,通常是StructColumnReader,因为它对应Parquet文件的根结构。然后根据读取到的列信息填充columns向量,记录每个列的名称和类型。
扫描数据的过程由Scan函数处理。ParquetReaderScanState用于跟踪扫描的状态,包括当前处理的行组、文件句柄、过滤器等。PrepareRowGroupBuffer函数准备当前行组的数据,可能应用统计信息进行过滤,跳过不需要读取的行组。然后通过ColumnReader的Read方法将数据读取到DataChunk中,应用过滤器(如果有的话),最终将结果返回。
过滤器处理部分,ApplyFilter函数根据过滤条件对读取的数据进行过滤。例如,等值比较、范围比较、IS NULL等条件会被应用到数据上,减少需要处理的数据量,提升查询性能。
初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ParquetReader::ParquetReader (ClientContext &context, string file_name, ParquetOptions parquet_options, shared_ptr<ParquetFileMetadataCache> metadata) : fs (FileSystem::GetFileSystem (context)), allocator (BufferAllocator::Get (context)), parquet_options (std::move (parquet_options)) { file_handle = fs.OpenFile (file_name, FileFlags::FILE_FLAGS_READ); if (!metadata) { this ->metadata = LoadMetadata (context, allocator, *file_handle, parquet_options.encryption_config, *encryption_util); } else { this ->metadata = std::move (metadata); } InitializeSchema (context); }
关键步骤:
文件打开:使用DuckDB统一文件系统接口
元数据加载:
优先使用传入的缓存元数据;
无缓存时调用LoadMetadata解析
Schema初始化:构建列读取器的树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 shared_ptr<ParquetFileMetadataCache> LoadMetadata (...) { auto file_proto = CreateThriftFileProtocol (allocator, file_handle, false ); transport.read (buf.ptr, 8 ); if (memcmp (buf.ptr+4 , "PAR1" ,4 )==0 ) { footer_encrypted = false ; } else if (memcmp (..., "PARE" , ...)) { footer_encrypted = true ; ParquetCrypto::Read (...); } auto metadata = make_uniq <FileMetaData>(); metadata->read (file_proto.get ()); auto geo_metadata = GeoParquetFileMetadata::TryRead (*metadata, context); return make_shared_ptr <ParquetFileMetadataCache>(...); }
元数据结构:
1 2 3 4 5 struct ParquetFileMetadataCache { unique_ptr<FileMetaData> metadata; time_t read_time; unique_ptr<GeoParquetFileMetadata> geo_metadata; };
Schema 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void ParquetReader::InitializeSchema (ClientContext &context) { auto file_meta_data = GetFileMetadata (); root_reader = CreateReader (context); auto &struct_reader = root_reader->Cast <StructColumnReader>(); for (idx_t i = 0 ; i < child_types.size (); i++) { columns.emplace_back ( MultiFileReaderColumnDefinition ( child_types[i].first, child_types[i].second ) ); } if (parquet_options.file_row_number) { columns.emplace_back ("file_row_number" , LogicalType::BIGINT); } }
结构说明 graph TD
A[ParquetReader] --> B[FileHandle]
A --> C[ParquetFileMetadataCache]
C --> D[FileMetaData]
C --> E[GeoParquetMetadata]
A --> F[ColumnReader树]
subgraph 元数据
D --> G[SchemaElements]
D --> H[RowGroups]
H --> I[ColumnChunks]
I --> J[Statistics]
I --> K[EncryptionInfo]
end
subgraph 读取器结构
F --> L[StructColumnReader]
L --> M[ColumnReader...]
M --> N[ListColumnReader]
M --> O[TemplatedColumnReader]
end
B -->|读取| P[Parquet文件]
style A fill:#f9f,stroke:#333
style C fill:#bbf,stroke:#333
style F fill:#9f9,stroke:#333
关键数据结构说明
FileMetaData 元数据类结构
1 2 3 4 5 6 7 8 9 struct FileMetaData { 1 : required i32 version 2 : required list<SchemaElement> schema 3 : required i64 num_rows 4 : required list<RowGroup> row_groups 5 : optional list<KeyValue> key_value_metadata 6 : optional string created_by 7 : optional list<ColumnOrder> column_orders }
ColumnReader 类层次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class ColumnReader {public : virtual void Read (...) = 0 ; virtual void Skip (...) = 0 ; virtual unique_ptr<BaseStatistics> Stats (...) = 0 ; }; class StructColumnReader : public ColumnReader { vector<unique_ptr<ColumnReader>> child_readers; }; class ListColumnReader : public ColumnReader { unique_ptr<ColumnReader> child_reader; }; template <class T >class TemplatedColumnReader : public ColumnReader { };
初始化阶段性能优化
元数据缓存
1 2 3 4 5 metadata = ObjectCache::Get (file_name); if (!metadata || expired) { metadata = LoadMetadata (...); ObjectCache::Put (file_name, metadata); }
延迟加载
1 2 3 4 5 6 void StructColumnReader::InitializeRead (...) { for (auto &child : child_readers) { if (child) child->InitializeRead (...); } }
内存预分配
1 2 ResizeableBuffer define_buf; define_buf.resize (allocator, STANDARD_VECTOR_SIZE);
初始化时序图 sequenceDiagram
participant Client
participant ParquetReader
participant FileSystem
participant ThriftParser
participant ColumnReader
Client->>ParquetReader: 创建Reader(file_name)
ParquetReader->>FileSystem: OpenFile()
FileSystem-->>ParquetReader: 返回FileHandle
alt 无缓存
ParquetReader->>ThriftParser: LoadMetadata()
ThriftParser->>FileHandle: 读取文件尾部
ThriftParser-->>ParquetReader: FileMetaData
else 有缓存
ParquetReader->>ObjectCache: GetMetadata()
end
ParquetReader->>ColumnReader: CreateReaderRecursive()
loop 递归创建
ColumnReader->>ColumnReader: 创建子读取器
end
ParquetReader-->>Client: 初始化完成
扫描数据过程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void ParquetReader::InitializeScan (ClientContext &context, ParquetReaderScanState &state, vector<idx_t > groups_to_read) { state.current_group = -1 ; state.group_offset = 0 ; state.finished = false ; state.file_handle = fs.OpenFile (file_name, FileFlags::FILE_FLAGS_READ | (prefetch ? FILE_FLAGS_DIRECT_IO : 0 )); state.thrift_file_proto = CreateThriftFileProtocol (allocator, *state.file_handle, state.prefetch_mode); state.root_reader = CreateReader (context); state.define_buf.resize (allocator, STANDARD_VECTOR_SIZE); state.repeat_buf.resize (allocator, STANDARD_VECTOR_SIZE); Value prefetch_all; context.TryGetCurrentSetting ("prefetch_all_parquet_files" , prefetch_all); state.prefetch_mode = prefetch_all.GetValue <bool >(); }
关键点:
每个扫描状态维护独立的文件句柄和Thrift解析器,支持并发扫描
根据配置选择直接IO模式(减少内核缓存开销)
列读取器树可能根据谓词进行剪枝(减少不必要的列读取)
RowGroup处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void ParquetReader::PrepareRowGroupBuffer (ParquetReaderScanState &state, idx_t out_col_idx) { auto &group = GetGroup (state); auto column_id = reader_data.column_ids[out_col_idx]; auto &column_reader = state.root_reader->Cast <StructColumnReader>().GetChildReader (column_id); auto stats = column_reader.Stats (state.current_group, group.columns); if (stats && reader_data.filters) { auto global_id = reader_data.column_mapping[out_col_idx]; auto filter = reader_data.filters->filters[global_id]; if (stats->CheckFilter (*filter) == FilterPropagateResult::FILTER_ALWAYS_FALSE) { state.group_offset = group.num_rows; return ; } } column_reader.InitializeRead (state.current_group, group.columns, *state.thrift_file_proto); }
过滤逻辑:
使用行组的min/max值快速判断是否跳过;
对字符串列使用更精确的字典过滤;
ScanInternal 函数 ScanInternal函数负责从Parquet文件中读取数据到DataChunk中,处理行组切换、预取、数据解码和过滤等步骤。函数开始检查是否完成扫描,然后处理行组切换。在切换行组时,会进行预取操作,包括整个行组的预取或按列预取。之后,函数计算需要读取的行数,初始化过滤掩码,然后按列读取数据,应用过滤器,最后更新状态。在预取部分,代码根据是否启用预取模式(prefetch_mode)以及扫描的列比例(scan_percentage)来决定是预取整个行组还是按列预取。
行组切换与预取处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 if (state.current_group < 0 || (int64_t )state.group_offset >= GetGroup (state).num_rows) { state.current_group++; state.group_offset = 0 ; auto &trans = reinterpret_cast <ThriftFileTransport &>(*state.thrift_file_proto->getTransport ()); trans.ClearPrefetch (); if ((idx_t )state.current_group == state.group_idx_list.size ()) { state.finished = true ; return false ; } uint64_t to_scan_compressed_bytes = 0 ; for (所有列) { PrepareRowGroupBuffer (...); to_scan_compressed_bytes += 列压缩大小; } if (state.prefetch_mode) { double scan_percentage = 扫描字节数 / 行组总跨度; if (scan_percentage > 0.95 ) { trans.Prefetch (起始偏移, 行组总跨度); } else { for (所有列) { if (无过滤 || 列有过滤条件) { 注册列预取; } } trans.PrefetchRegistered (); } } }
预取策略:
条件
策略
适用场景
scan_percentage > 95%
预取整个行组
全表扫描
存在过滤器
仅预取过滤列
条件查询
无过滤器
预取所有列
投影查询
读取逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 idx_t this_output_chunk_rows = Min (STANDARD_VECTOR_SIZE, GetGroup ().num_rows - state.group_offset); result.SetCardinality (this_output_chunk_rows); parquet_filter_t filter_mask;filter_mask.set (); for (i >= this_output_chunk_rows) filter_mask.reset (i); if (存在过滤器) { for (每个过滤条件) { if (过滤列是常量) { ApplyFilter (常量向量, 过滤条件); } else { 读取过滤列数据到result_vector; ApplyFilter (result_vector, 过滤条件); } } for (剩余列) { if (过滤结果全无效) { Skip (); } else { 读取列数据到result_vector; } } SelectionVector sel; for (i=0 到 this_output_chunk_rows) { if (filter_mask.test (i)) sel.append (i); } result.Slice (sel); } else { for (所有列) { 读取列数据到result_vector; } } state.group_offset += this_output_chunk_rows;
ThriftFileTransport 数据结构 1 2 3 4 5 6 7 8 9 struct ReadHead { ReadHead (idx_t location, uint64_t size) : location (location), size (size) {}; idx_t location; uint64_t size; AllocatedData data; bool data_isset = false ; idx_t GetEnd () const { return size + location; } void Allocate (Allocator &allocator) { data = allocator.Allocate (size); } };
ReadHead表示一个数据块的基本信息,包括起始位置、大小、存储空间等信息。 初始化时传入起始位置和大小,并且提供获取数据块结束位置的方法,以及提供内存分配方法,将数据加载到data缓冲区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 struct ReadHeadComparator { static constexpr uint64_t ALLOW_GAP = 1 << 14 ; bool operator () (const ReadHead *a, const ReadHead *b) const { auto a_start = a->location; auto a_end = a->location + a->size; auto b_start = b->location; if (a_end <= NumericLimits<idx_t >::Maximum () - ALLOW_GAP) { a_end += ALLOW_GAP; } return a_start < b_start && a_end < b_start; } };
ReadHeadComparator用于比较两个ReadHead对象的位置关系,判断是否允许合并或重叠。具体的,类中定义了一个允许的隔距ALLOW_GAP(16KiB),用来比较两个数据块的起始位置和结束位置:如果a的结束位置(加上允许隔距)在b的起始位置之前,则认为两者不重叠。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 struct ReadAheadBuffer { ReadAheadBuffer (Allocator &allocator, FileHandle &handle) : allocator (allocator), handle (handle) {} std::list<ReadHead> read_heads; std::set<ReadHead *, ReadHeadComparator> merge_set; Allocator &allocator; FileHandle &handle; idx_t total_size = 0 ; void AddReadHead (idx_t pos, uint64_t len, bool merge_buffers = true ) { if (merge_buffers) { ReadHead new_read_head {pos, len}; auto lookup_set = merge_set.find (&new_read_head); if (lookup_set != merge_set.end ()) { auto existing_head = *lookup_set; auto new_start = MinValue (existing_head->location, new_read_head.location); auto new_length = MaxValue (existing_head->GetEnd (), new_read_head.GetEnd ()) - new_start; existing_head->location = new_start; existing_head->size = new_length; return ; } } read_heads.emplace_front (ReadHead (pos, len)); total_size += len; auto &read_head = read_heads.front (); if (merge_buffers) { merge_set.insert (&read_head); } if (read_head.GetEnd () > handle.GetFileSize ()) { throw std::runtime_error ("Prefetch registered for bytes outside file" ); } } ReadHead *GetReadHead (idx_t pos) { for (auto &read_head : read_heads) { if (pos >= read_head.location && pos < read_head.GetEnd ()) { return &read_head; } } return nullptr ; } void Prefetch () { for (auto &read_head : read_heads) { read_head.Allocate (allocator); if (read_head.GetEnd () > handle.GetFileSize ()) { throw std::runtime_error ("Prefetch requested for bytes outside file" ); } handle.Read (read_head.data.get (), read_head.size, read_head.location); read_head.data_isset = true ; } } };
ReadAheadBuffer 结构用来管理预读数据块的队列和合并操作,list存储所有未读取的数据块,set配合ReadHeadComparator合并相邻或重叠的数据块,减少存储和合并IO,具体的:
AddReadHead添加预读数据块,并尝试与现有数据块合并;
GetReadHead根据给定位置返回对应的数据块;
Prefetch将所有预读数据块从文件中读取并加载到内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class ThriftFileTransport { uint32_t read (uint8_t *buf, uint32_t len) { auto prefetch_buffer = ra_buffer.GetReadHead (location); if (prefetch_buffer != nullptr && location - prefetch_buffer->location + len <= prefetch_buffer->size) { if (!prefetch_buffer->data_isset) { prefetch_buffer->Allocate (allocator); handle.Read (prefetch_buffer->data.get (), prefetch_buffer->size, prefetch_buffer->location); prefetch_buffer->data_isset = true ; } memcpy (buf, prefetch_buffer->data.get () + location - prefetch_buffer->location, len); } else { if (prefetch_mode && len < PREFETCH_FALLBACK_BUFFERSIZE && len > 0 ) { Prefetch (location, MinValue (handle.GetFileSize () - location, PREFETCH_FALLBACK_BUFFERSIZE)); auto prefetch_buffer_fallback = ra_buffer.GetReadHead (location); memcpy (buf, prefetch_buffer_fallback->data.get () + location - prefetch_buffer_fallback->location, len); } else { handle.Read (buf, len, location); } } location += len; return len; } private : FileHandle &handle; idx_t location; Allocator &allocator; ReadAheadBuffer ra_buffer; bool prefetch_mode; }
ThriftFileTransport重载read方法,优先使用预读数据块,如果数据块未加载则实时读取,并且提供方法Prefetch、RegisterPrefetch、FinalizeRegistration和PrefetchRegistered,实现预读策略:
先注册所有需要预读的范围;
清理合并操作以固定范围;
执行预读操作将数据加载到内存。
不同预取策略:
策略类型
触发条件
优势
实现位置
主动批量预取
已知读取模式时
最大化顺序读取性能
ReadAheadBuffer::Prefetch
被动按需预取
未预取或预取未覆盖时
避免小数据预取开销
ThriftFileTransport::read
读取流程图
graph TD
A[打开文件] --> B{加密检查}
B -- 是 --> C[解密元数据]
B -- 否 --> D[直接读取元数据]
C --> E[解析元数据]
D --> E
E --> F[构建Schema树]
F --> G[递归创建列读取器]
G --> H[Struct/List列处理]
G --> I[基本类型列处理]
subgraph 扫描循环
J[选择行组] --> K{行组过滤?}
K -- 跳过 --> M[下一行组]
K -- 读取 --> L[初始化行组读取]
L --> N[预取数据]
N --> O[列并行读取]
O --> P[应用谓词下推]
P --> Q[向量化过滤]
Q --> R[输出DataChunk]
R --> S{更多行组?}
S -- 是 --> J
S -- 否 --> T[结束扫描]
end
E --> J
I --> O
style A fill:#f9f,stroke:#333
style E fill:#bbf,stroke:#333
style G fill:#9f9,stroke:#333
style J fill:#f96,stroke:#333
style O fill:#6f9,stroke:#333
总结 DuckDB 读取 Parquet 文件的设计通过文件系统接口打开文件,借助 Thrift 协议解析元数据,构建列读取器树并初始化 Schema。在扫描过程中,通过行组过滤、列并行读取和谓词下推等优化策略,高效地预取数据并将其转化为向量化数据块,同时利用元数据缓存、延迟加载和内存预分配等机制提升性能,实现对 Parquet 文件的快速读取和处理。设计充分考虑了现代硬件特性,通过向量化处理、数据局部性优化和多级并行,实现了高效的列式数据读取。同时,灵活的过滤机制使得在复杂查询场景下能显著减少不必要的IO和计算开销。