diff --git a/src/Processors/Formats/Impl/Parquet/ThriftUtil.cpp b/src/Processors/Formats/Impl/Parquet/ThriftUtil.cpp index 2a99b028ae0..b8704d8f84c 100644 --- a/src/Processors/Formats/Impl/Parquet/ThriftUtil.cpp +++ b/src/Processors/Formats/Impl/Parquet/ThriftUtil.cpp @@ -31,5 +31,7 @@ size_t serializeThriftStruct(const T & obj, WriteBuffer & out) template size_t serializeThriftStruct(const parquet::format::PageHeader &, WriteBuffer & out); template size_t serializeThriftStruct(const parquet::format::ColumnChunk &, WriteBuffer & out); template size_t serializeThriftStruct(const parquet::format::FileMetaData &, WriteBuffer & out); +template size_t serializeThriftStruct(const parquet::format::ColumnIndex &, WriteBuffer & out); +template size_t serializeThriftStruct(const parquet::format::OffsetIndex &, WriteBuffer & out); } diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index 7c0aab13a26..e7d152d33fc 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -543,6 +543,7 @@ void writeColumnImpl( { parq::PageHeader header; PODArray data; + size_t first_row_index = 0; }; std::vector dict_encoded_pages; // can't write them out until we have full dictionary @@ -596,9 +597,32 @@ void writeColumnImpl( if (options.write_page_statistics) { d.__set_statistics(page_statistics.get(options)); - - if (s.max_def == 1 && s.max_rep == 0) + bool all_null_page = data_count == 0; + if (all_null_page) + { + s.column_index.min_values.push_back(""); + s.column_index.max_values.push_back(""); + } + else + { + s.column_index.min_values.push_back(d.statistics.min_value); + s.column_index.max_values.push_back(d.statistics.max_value); + } + bool has_null_count = s.max_def == 1 && s.max_rep == 0; + if (has_null_count) d.statistics.__set_null_count(static_cast(def_count - data_count)); + s.column_index.__isset.null_counts = has_null_count; + if (has_null_count) + { + s.column_index.__isset.null_counts = true; + s.column_index.null_counts.emplace_back(d.statistics.null_count); + } + else + { + if (s.column_index.__isset.null_counts) + s.column_index.null_counts.emplace_back(0); + } + s.column_index.null_pages.push_back(all_null_page); } total_statistics.merge(page_statistics); @@ -606,14 +630,18 @@ void writeColumnImpl( if (use_dictionary) { - dict_encoded_pages.push_back({.header = std::move(header), .data = {}}); + dict_encoded_pages.push_back({.header = std::move(header), .data = {}, .first_row_index = def_offset}); std::swap(dict_encoded_pages.back().data, compressed); } else { + parquet::format::PageLocation location; + location.offset = out.count(); writePage(header, compressed, s, out); + location.compressed_page_size = static_cast(out.count() - location.offset); + location.first_row_index = def_offset; + s.offset_index.page_locations.emplace_back(location); } - def_offset += def_count; data_offset += data_count; }; @@ -642,7 +670,14 @@ void writeColumnImpl( writePage(header, compressed, s, out); for (auto & p : dict_encoded_pages) + { + parquet::format::PageLocation location; + location.offset = out.count(); writePage(p.header, p.data, s, out); + location.compressed_page_size = static_cast(out.count() - location.offset); + location.first_row_index = p.first_row_index; + s.offset_index.page_locations.emplace_back(location); + } dict_encoded_pages.clear(); encoder.reset(); @@ -703,6 +738,9 @@ void writeColumnImpl( def_offset = 0; data_offset = 0; dict_encoded_pages.clear(); + + //clear column_index + s.column_index = parquet::format::ColumnIndex(); use_dictionary = false; #ifndef NDEBUG @@ -909,6 +947,50 @@ parq::RowGroup makeRowGroup(std::vector column_chunks, size_t return r; } +void writePageIndex( + const std::vector> & column_indexes, + const std::vector> & offset_indexes, + std::vector & row_groups, + WriteBuffer & out, + size_t base_offset) +{ + chassert(row_groups.size() == column_indexes.size() && row_groups.size() == offset_indexes.size()); + auto num_row_groups = row_groups.size(); + // write column index + for (size_t i = 0; i < num_row_groups; ++i) + { + const auto & current_group_column_index = column_indexes.at(i); + chassert(row_groups.at(i).columns.size() == current_group_column_index.size()); + auto & row_group = row_groups.at(i); + for (size_t j = 0; j < row_groups.at(i).columns.size(); ++j) + { + auto & column = row_group.columns.at(j); + int64_t column_index_offset = static_cast(out.count() - base_offset); + int32_t column_index_length = static_cast(serializeThriftStruct(current_group_column_index.at(j), out)); + column.__isset.column_index_offset = true; + column.column_index_offset = column_index_offset; + column.__isset.column_index_length = true; + column.column_index_length = column_index_length; + } + } + + // write offset index + for (size_t i = 0; i < num_row_groups; ++i) + { + const auto & current_group_offset_index = offset_indexes.at(i); + chassert(row_groups.at(i).columns.size() == current_group_offset_index.size()); + for (size_t j = 0; j < row_groups.at(i).columns.size(); ++j) + { + int64_t offset_index_offset = out.count() - base_offset; + int32_t offset_index_length = static_cast(serializeThriftStruct(current_group_offset_index.at(j), out)); + row_groups.at(i).columns.at(j).__isset.offset_index_offset = true; + row_groups.at(i).columns.at(j).offset_index_offset = offset_index_offset; + row_groups.at(i).columns.at(j).__isset.offset_index_length = true; + row_groups.at(i).columns.at(j).offset_index_length = offset_index_length; + } + } +} + void writeFileFooter(std::vector row_groups, SchemaElements schema, const WriteOptions & options, WriteBuffer & out) { parq::FileMetaData meta; diff --git a/src/Processors/Formats/Impl/Parquet/Write.h b/src/Processors/Formats/Impl/Parquet/Write.h index f162984fd5e..695c37caadb 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.h +++ b/src/Processors/Formats/Impl/Parquet/Write.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB::Parquet { @@ -55,6 +56,9 @@ struct ColumnChunkWriteState UInt8 max_def = 0; UInt8 max_rep = 0; + parquet::format::ColumnIndex column_index; + parquet::format::OffsetIndex offset_index; + ColumnChunkWriteState() = default; /// Prevent accidental copying. ColumnChunkWriteState(ColumnChunkWriteState &&) = default; @@ -133,6 +137,7 @@ parquet::format::ColumnChunk finalizeColumnChunkAndWriteFooter( parquet::format::RowGroup makeRowGroup(std::vector column_chunks, size_t num_rows); +void writePageIndex(const std::vector>& column_indexes, const std::vector>& offset_indexes, std::vector& row_groups, WriteBuffer & out, size_t base_offset); void writeFileFooter(std::vector row_groups, SchemaElements schema, const WriteOptions & options, WriteBuffer & out); } diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index 645f16c6abe..d3553c04e9d 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -229,6 +229,8 @@ void ParquetBlockOutputFormat::finalizeImpl() base_offset = out.count(); writeFileHeader(out); } + if (format_settings.parquet.write_page_index) + writePageIndex(column_indexes, offset_indexes, row_groups_complete, out, base_offset); writeFileFooter(std::move(row_groups_complete), schema, options, out); } else @@ -264,6 +266,8 @@ void ParquetBlockOutputFormat::resetFormatterImpl() task_queue.clear(); row_groups.clear(); file_writer.reset(); + column_indexes.clear(); + offset_indexes.clear(); row_groups_complete.clear(); staging_chunks.clear(); staging_rows = 0; @@ -368,12 +372,17 @@ void ParquetBlockOutputFormat::writeRowGroupInOneThread(Chunk chunk) base_offset = out.count(); writeFileHeader(out); } - + auto & rg_column_index = column_indexes.emplace_back(); + auto & rg_offset_index = offset_indexes.emplace_back(); std::vector column_chunks; for (auto & s : columns_to_write) { size_t offset = out.count() - base_offset; writeColumnChunkBody(s, options, out); + for (auto & location : s.offset_index.page_locations) + location.offset -= base_offset; + rg_column_index.emplace_back(std::move(s.column_index)); + rg_offset_index.emplace_back(std::move(s.offset_index)); auto c = finalizeColumnChunkAndWriteFooter(offset, std::move(s), options, out); column_chunks.push_back(std::move(c)); } @@ -436,16 +445,20 @@ void ParquetBlockOutputFormat::reapCompletedRowGroups(std::unique_lock metadata; for (auto & cols : r.column_chunks) { for (ColumnChunk & col : cols) { size_t offset = out.count() - base_offset; - + for (auto & location : col.state.offset_index.page_locations) + location.offset += offset; out.write(col.serialized.data(), col.serialized.size()); + rg_column_index.emplace_back(std::move(col.state.column_index)); + rg_offset_index.emplace_back(std::move(col.state.offset_index)); auto m = finalizeColumnChunkAndWriteFooter(offset, std::move(col.state), options, out); - metadata.push_back(std::move(m)); } } diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h index f8f5d2556a5..775363a917d 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h @@ -138,6 +138,8 @@ private: Parquet::WriteOptions options; Parquet::SchemaElements schema; std::vector row_groups_complete; + std::vector> column_indexes; + std::vector> offset_indexes; size_t base_offset = 0; diff --git a/src/Processors/tests/gtest_write_parquet_page_index.cpp b/src/Processors/tests/gtest_write_parquet_page_index.cpp new file mode 100644 index 00000000000..da0eadda795 --- /dev/null +++ b/src/Processors/tests/gtest_write_parquet_page_index.cpp @@ -0,0 +1,319 @@ +#include + +#include +#if USE_PARQUET + +# include +# include +# include +# include +# include +# include +# include +# include + +# include +# include +# include +# include +# include +# include + +using namespace DB; +namespace +{ +template +std::shared_ptr multiColumnsSource(const std::vector & type, const std::vector> & values, size_t times) +{ + Block header; + for (size_t i = 0; i < type.size(); ++i) + { + header.insert(ColumnWithTypeAndName(type[i], "x" + std::to_string(i))); + } + + + Chunks chunks; + + for (size_t i = 0; i < times; ++i) + { + Chunk chunk; + Columns columns; + for (size_t j = 0; j < type.size(); ++j) + { + auto column = type[j]->createColumn(); + for (const auto& n : values[j]) + { + if constexpr (std::is_same_v) + if (n == 0) + column->insertDefault(); + else + column->insert(n); + else + column->insert(n); + } + columns.push_back(std::move(column)); + } + chunk = Chunk(Columns{std::move(columns)}, values[0].size()); + chunks.push_back(std::move(chunk)); + } + return std::make_shared(header, std::move(chunks)); +} + +void validatePageIndex( + String path, + std::optional)>> validate_null_pages = std::nullopt, + std::optional)>> validate_null_counts = std::nullopt) +{ + std::shared_ptr<::arrow::io::RandomAccessFile> source; + PARQUET_ASSIGN_OR_THROW(source, ::arrow::io::ReadableFile::Open(path)) + auto reader = parquet::ParquetFileReader::OpenFile(path); + auto metadata = reader->metadata(); + auto properties = parquet::default_reader_properties(); + parquet::ThriftDeserializer deserializer(properties); + for (int i = 0; i < metadata->num_row_groups(); ++i) + { + auto row_group = metadata->RowGroup(i); + std::vector column_index_offsets; + std::vector column_index_lengths; + std::vector offset_index_offsets; + std::vector offset_index_lengths; + for (int j = 0; j < row_group->num_columns(); j++) + { + auto column_chunk = row_group->ColumnChunk(j); + auto column_index = column_chunk->GetColumnIndexLocation(); + auto offset_index = column_chunk->GetOffsetIndexLocation(); + ASSERT_TRUE(column_index.has_value()); + ASSERT_TRUE(offset_index.has_value()); + ASSERT_GT(column_index.value().offset, 0); + ASSERT_GT(column_index.value().length, 0); + ASSERT_GT(offset_index.value().offset, 0); + ASSERT_GT(offset_index.value().length, 0); + column_index_offsets.push_back(column_index.value().offset); + column_index_lengths.push_back(column_index.value().length); + offset_index_offsets.push_back(offset_index.value().offset); + offset_index_lengths.push_back(offset_index.value().length); + } + for (int k = 0; k < row_group->num_columns(); k++) + { + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_TRUE(page_index_reader != nullptr); + PARQUET_ASSIGN_OR_THROW(auto column_index_buffer, source->ReadAt(column_index_offsets[k], column_index_lengths[k])) + PARQUET_ASSIGN_OR_THROW(auto offset_index_buffer, source->ReadAt(offset_index_offsets[k], offset_index_lengths[k])) + const auto *column_descr = metadata->schema()->Column(k); + std::unique_ptr column_index = parquet::ColumnIndex::Make( + *column_descr, column_index_buffer->data(), static_cast(column_index_buffer->size()), properties); + std::unique_ptr offset_index + = parquet::OffsetIndex::Make(offset_index_buffer->data(), static_cast(offset_index_buffer->size()), properties); + // validate null pages + if (validate_null_pages.has_value()) + { + validate_null_pages.value()(column_index->null_pages()); + } + size_t num_pages = offset_index->page_locations().size(); + // validate null counts + if (column_index->has_null_counts()) + { + ASSERT_EQ(column_index->null_counts().size(), num_pages); + if (validate_null_counts.has_value()) + { + validate_null_counts.value()(column_index->null_counts()); + } + } + // validate min max values + auto total_values = 0; + for (size_t l = 0; l < num_pages; l++) + { + auto page_location = offset_index->page_locations().at(l); + PARQUET_ASSIGN_OR_THROW(auto page_buffer, source->ReadAt(page_location.offset, page_location.compressed_page_size)) + parquet::format::PageHeader header; + uint32_t header_size = static_cast(page_buffer->size()); + deserializer.DeserializeMessage(page_buffer->data(), &header_size, &header); + ASSERT_TRUE(header.type == parquet::format::PageType::DATA_PAGE); + if (!column_index->null_pages().at(l)) + { + ASSERT_EQ(header.data_page_header.statistics.min_value, column_index->encoded_min_values().at(l)); + ASSERT_EQ(header.data_page_header.statistics.max_value, column_index->encoded_max_values().at(l)); + if (column_index->has_null_counts()) + ASSERT_GT(header.data_page_header.num_values, column_index->null_counts().at(l)); + } + else + { + ASSERT_EQ(header.data_page_header.num_values, column_index->null_counts().at(l)); + } + ASSERT_EQ(page_location.first_row_index, total_values); + total_values += header.data_page_header.num_values; + } + } + } +} + +void writeParquet(SourcePtr source, const FormatSettings & format_settings, String parquet_path) +{ + QueryPipelineBuilder pipeline_builder; + pipeline_builder.init(Pipe(source)); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline_builder)); + WriteBufferFromFile write_buffer(parquet_path); + auto output = std::make_shared(write_buffer, pipeline.getHeader(), format_settings); + pipeline.complete(output); + CompletedPipelineExecutor executor(pipeline); + executor.execute(); +} + +TEST(Parquet, WriteParquetPageIndexParrelel) +{ + FormatSettings format_settings; + format_settings.parquet.row_group_rows = 10000; + format_settings.parquet.use_custom_encoder = true; + format_settings.parquet.parallel_encoding = true; + format_settings.parquet.write_page_index = true; + format_settings.parquet.data_page_size = 32; + format_settings.max_threads = 2; + + std::vector> values; + std::vector col; + col.reserve(1000); + for (size_t i = 0; i < 1000; i++) + { + col.push_back(i % 10); + } + values.push_back(col); + values.push_back(col); + auto source = multiColumnsSource( + {makeNullable(std::make_shared()), makeNullable(std::make_shared())}, values, 100); + String path = "/tmp/test.parquet"; + writeParquet(source, format_settings, path); + validatePageIndex( + path, + [](auto null_pages) + { + for (auto null_page : null_pages) + { + ASSERT_TRUE(!null_page); + } + }, + [](auto null_counts) + { + for (auto null_count : null_counts) + { + ASSERT_TRUE(null_count > 0); + } + }); +} + +TEST(Parquet, WriteParquetPageIndexParrelelPlainEnconding) +{ + FormatSettings format_settings; + format_settings.parquet.row_group_rows = 10000; + format_settings.parquet.use_custom_encoder = true; + format_settings.parquet.parallel_encoding = true; + format_settings.parquet.write_page_index = true; + format_settings.parquet.data_page_size = 32; + format_settings.max_threads = 2; + + std::vector> values; + std::vector col; + for (size_t i = 0; i < 100000; i++) + { + col.push_back(std::to_string(i)); + } + values.push_back(col); + values.push_back(col); + auto source = multiColumnsSource( + {makeNullable(std::make_shared()), makeNullable(std::make_shared())}, values, 10); + String path = "/tmp/test.parquet"; + writeParquet(source, format_settings, path); + validatePageIndex( + path, + [](auto null_pages) + { + for (auto null_page : null_pages) + { + ASSERT_FALSE(null_page); + } + }, + [](auto null_counts) + { + for (auto null_count : null_counts) + { + ASSERT_EQ(null_count, 0); + } + }); +} + +TEST(Parquet, WriteParquetPageIndexParrelelAllNull) +{ + FormatSettings format_settings; + format_settings.parquet.row_group_rows = 10000; + format_settings.parquet.use_custom_encoder = true; + format_settings.parquet.parallel_encoding = true; + format_settings.parquet.write_page_index = true; + format_settings.parquet.data_page_size = 32; + format_settings.max_threads = 2; + + std::vector> values; + auto & col = values.emplace_back(); + for (size_t i = 0; i < 1000; i++) + { + col.push_back(0); + } + auto source = multiColumnsSource({makeNullable(std::make_shared())}, values, 100); + String path = "/tmp/test.parquet"; + writeParquet(source, format_settings, path); + validatePageIndex( + path, + [](auto null_pages) + { + for (auto null_page : null_pages) + { + ASSERT_TRUE(null_page); + } + }, + [](auto null_counts) + { + for (auto null_count : null_counts) + { + ASSERT_TRUE(null_count > 0); + } + }); +} + +TEST(Parquet, WriteParquetPageIndexSingleThread) +{ + FormatSettings format_settings; + format_settings.parquet.row_group_rows = 10000; + format_settings.parquet.use_custom_encoder = true; + format_settings.parquet.parallel_encoding = false; + format_settings.parquet.write_page_index = true; + format_settings.parquet.data_page_size = 32; + + std::vector> values; + std::vector col; + for (size_t i = 0; i < 1000; i++) + { + col.push_back(i % 10); + } + values.push_back(col); + values.push_back(col); + auto source = multiColumnsSource( + {makeNullable(std::make_shared()), makeNullable(std::make_shared())}, values, 100); + String path = "/tmp/test.parquet"; + writeParquet(source, format_settings, path); + validatePageIndex( + path, + [](auto null_pages) + { + for (auto null_page : null_pages) + { + ASSERT_TRUE(!null_page); + } + }, + [](auto null_counts) + { + for (auto null_count : null_counts) + { + ASSERT_TRUE(null_count > 0); + } + }); +} +} +#endif diff --git a/tests/integration/test_parquet_page_index/test.py b/tests/integration/test_parquet_page_index/test.py index 82584785b67..eeb5344228a 100644 --- a/tests/integration/test_parquet_page_index/test.py +++ b/tests/integration/test_parquet_page_index/test.py @@ -55,7 +55,7 @@ def delete_if_exists(file_path): ( "SELECT number, number+1 FROM system.numbers LIMIT 100 " "INTO OUTFILE '{file_name}' FORMAT Parquet;", - False, + True, ), }, ) @@ -92,7 +92,7 @@ def test_parquet_page_index_select_into_outfile(query, expected_result, start_cl ( "INSERT INTO TABLE FUNCTION file('{file_name}') " "SELECT number, number+1 FROM system.numbers LIMIT 100 FORMAT Parquet", - False, + True, ), }, )