Merge pull request #70669 from liuneng1994/support-write-columnindex

Support write parquet column index and offset index
This commit is contained in:
Alexey Milovidov 2024-10-18 20:59:40 +00:00 committed by GitHub
commit 7afd68166a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 432 additions and 9 deletions

View File

@ -31,5 +31,7 @@ size_t serializeThriftStruct(const T & obj, WriteBuffer & out)
template size_t serializeThriftStruct<parquet::format::PageHeader>(const parquet::format::PageHeader &, WriteBuffer & out);
template size_t serializeThriftStruct<parquet::format::ColumnChunk>(const parquet::format::ColumnChunk &, WriteBuffer & out);
template size_t serializeThriftStruct<parquet::format::FileMetaData>(const parquet::format::FileMetaData &, WriteBuffer & out);
template size_t serializeThriftStruct<parquet::format::ColumnIndex>(const parquet::format::ColumnIndex &, WriteBuffer & out);
template size_t serializeThriftStruct<parquet::format::OffsetIndex>(const parquet::format::OffsetIndex &, WriteBuffer & out);
}

View File

@ -543,6 +543,7 @@ void writeColumnImpl(
{
parq::PageHeader header;
PODArray<char> data;
size_t first_row_index = 0;
};
std::vector<PageData> dict_encoded_pages; // can't write them out until we have full dictionary
@ -596,9 +597,32 @@ void writeColumnImpl(
if (options.write_page_statistics)
{
d.__set_statistics(page_statistics.get(options));
if (s.max_def == 1 && s.max_rep == 0)
bool all_null_page = data_count == 0;
if (all_null_page)
{
s.column_index.min_values.push_back("");
s.column_index.max_values.push_back("");
}
else
{
s.column_index.min_values.push_back(d.statistics.min_value);
s.column_index.max_values.push_back(d.statistics.max_value);
}
bool has_null_count = s.max_def == 1 && s.max_rep == 0;
if (has_null_count)
d.statistics.__set_null_count(static_cast<Int64>(def_count - data_count));
s.column_index.__isset.null_counts = has_null_count;
if (has_null_count)
{
s.column_index.__isset.null_counts = true;
s.column_index.null_counts.emplace_back(d.statistics.null_count);
}
else
{
if (s.column_index.__isset.null_counts)
s.column_index.null_counts.emplace_back(0);
}
s.column_index.null_pages.push_back(all_null_page);
}
total_statistics.merge(page_statistics);
@ -606,14 +630,18 @@ void writeColumnImpl(
if (use_dictionary)
{
dict_encoded_pages.push_back({.header = std::move(header), .data = {}});
dict_encoded_pages.push_back({.header = std::move(header), .data = {}, .first_row_index = def_offset});
std::swap(dict_encoded_pages.back().data, compressed);
}
else
{
parquet::format::PageLocation location;
location.offset = out.count();
writePage(header, compressed, s, out);
location.compressed_page_size = static_cast<int32_t>(out.count() - location.offset);
location.first_row_index = def_offset;
s.offset_index.page_locations.emplace_back(location);
}
def_offset += def_count;
data_offset += data_count;
};
@ -642,7 +670,14 @@ void writeColumnImpl(
writePage(header, compressed, s, out);
for (auto & p : dict_encoded_pages)
{
parquet::format::PageLocation location;
location.offset = out.count();
writePage(p.header, p.data, s, out);
location.compressed_page_size = static_cast<int32_t>(out.count() - location.offset);
location.first_row_index = p.first_row_index;
s.offset_index.page_locations.emplace_back(location);
}
dict_encoded_pages.clear();
encoder.reset();
@ -703,6 +738,9 @@ void writeColumnImpl(
def_offset = 0;
data_offset = 0;
dict_encoded_pages.clear();
//clear column_index
s.column_index = parquet::format::ColumnIndex();
use_dictionary = false;
#ifndef NDEBUG
@ -909,6 +947,50 @@ parq::RowGroup makeRowGroup(std::vector<parq::ColumnChunk> column_chunks, size_t
return r;
}
void writePageIndex(
const std::vector<std::vector<parquet::format::ColumnIndex>> & column_indexes,
const std::vector<std::vector<parquet::format::OffsetIndex>> & offset_indexes,
std::vector<parq::RowGroup> & row_groups,
WriteBuffer & out,
size_t base_offset)
{
chassert(row_groups.size() == column_indexes.size() && row_groups.size() == offset_indexes.size());
auto num_row_groups = row_groups.size();
// write column index
for (size_t i = 0; i < num_row_groups; ++i)
{
const auto & current_group_column_index = column_indexes.at(i);
chassert(row_groups.at(i).columns.size() == current_group_column_index.size());
auto & row_group = row_groups.at(i);
for (size_t j = 0; j < row_groups.at(i).columns.size(); ++j)
{
auto & column = row_group.columns.at(j);
int64_t column_index_offset = static_cast<int64_t>(out.count() - base_offset);
int32_t column_index_length = static_cast<int32_t>(serializeThriftStruct(current_group_column_index.at(j), out));
column.__isset.column_index_offset = true;
column.column_index_offset = column_index_offset;
column.__isset.column_index_length = true;
column.column_index_length = column_index_length;
}
}
// write offset index
for (size_t i = 0; i < num_row_groups; ++i)
{
const auto & current_group_offset_index = offset_indexes.at(i);
chassert(row_groups.at(i).columns.size() == current_group_offset_index.size());
for (size_t j = 0; j < row_groups.at(i).columns.size(); ++j)
{
int64_t offset_index_offset = out.count() - base_offset;
int32_t offset_index_length = static_cast<int32_t>(serializeThriftStruct(current_group_offset_index.at(j), out));
row_groups.at(i).columns.at(j).__isset.offset_index_offset = true;
row_groups.at(i).columns.at(j).offset_index_offset = offset_index_offset;
row_groups.at(i).columns.at(j).__isset.offset_index_length = true;
row_groups.at(i).columns.at(j).offset_index_length = offset_index_length;
}
}
}
void writeFileFooter(std::vector<parq::RowGroup> row_groups, SchemaElements schema, const WriteOptions & options, WriteBuffer & out)
{
parq::FileMetaData meta;

View File

@ -6,6 +6,7 @@
#include <DataTypes/IDataType.h>
#include <Common/PODArray.h>
#include <IO/CompressionMethod.h>
#include <generated/parquet_types.h>
namespace DB::Parquet
{
@ -55,6 +56,9 @@ struct ColumnChunkWriteState
UInt8 max_def = 0;
UInt8 max_rep = 0;
parquet::format::ColumnIndex column_index;
parquet::format::OffsetIndex offset_index;
ColumnChunkWriteState() = default;
/// Prevent accidental copying.
ColumnChunkWriteState(ColumnChunkWriteState &&) = default;
@ -133,6 +137,7 @@ parquet::format::ColumnChunk finalizeColumnChunkAndWriteFooter(
parquet::format::RowGroup makeRowGroup(std::vector<parquet::format::ColumnChunk> column_chunks, size_t num_rows);
void writePageIndex(const std::vector<std::vector<parquet::format::ColumnIndex>>& column_indexes, const std::vector<std::vector<parquet::format::OffsetIndex>>& offset_indexes, std::vector<parquet::format::RowGroup>& row_groups, WriteBuffer & out, size_t base_offset);
void writeFileFooter(std::vector<parquet::format::RowGroup> row_groups, SchemaElements schema, const WriteOptions & options, WriteBuffer & out);
}

View File

@ -229,6 +229,8 @@ void ParquetBlockOutputFormat::finalizeImpl()
base_offset = out.count();
writeFileHeader(out);
}
if (format_settings.parquet.write_page_index)
writePageIndex(column_indexes, offset_indexes, row_groups_complete, out, base_offset);
writeFileFooter(std::move(row_groups_complete), schema, options, out);
}
else
@ -264,6 +266,8 @@ void ParquetBlockOutputFormat::resetFormatterImpl()
task_queue.clear();
row_groups.clear();
file_writer.reset();
column_indexes.clear();
offset_indexes.clear();
row_groups_complete.clear();
staging_chunks.clear();
staging_rows = 0;
@ -368,12 +372,17 @@ void ParquetBlockOutputFormat::writeRowGroupInOneThread(Chunk chunk)
base_offset = out.count();
writeFileHeader(out);
}
auto & rg_column_index = column_indexes.emplace_back();
auto & rg_offset_index = offset_indexes.emplace_back();
std::vector<parquet::format::ColumnChunk> column_chunks;
for (auto & s : columns_to_write)
{
size_t offset = out.count() - base_offset;
writeColumnChunkBody(s, options, out);
for (auto & location : s.offset_index.page_locations)
location.offset -= base_offset;
rg_column_index.emplace_back(std::move(s.column_index));
rg_offset_index.emplace_back(std::move(s.offset_index));
auto c = finalizeColumnChunkAndWriteFooter(offset, std::move(s), options, out);
column_chunks.push_back(std::move(c));
}
@ -436,16 +445,20 @@ void ParquetBlockOutputFormat::reapCompletedRowGroups(std::unique_lock<std::mute
writeFileHeader(out);
}
auto & rg_column_index = column_indexes.emplace_back();
auto & rg_offset_index = offset_indexes.emplace_back();
std::vector<parquet::format::ColumnChunk> metadata;
for (auto & cols : r.column_chunks)
{
for (ColumnChunk & col : cols)
{
size_t offset = out.count() - base_offset;
for (auto & location : col.state.offset_index.page_locations)
location.offset += offset;
out.write(col.serialized.data(), col.serialized.size());
rg_column_index.emplace_back(std::move(col.state.column_index));
rg_offset_index.emplace_back(std::move(col.state.offset_index));
auto m = finalizeColumnChunkAndWriteFooter(offset, std::move(col.state), options, out);
metadata.push_back(std::move(m));
}
}

View File

@ -138,6 +138,8 @@ private:
Parquet::WriteOptions options;
Parquet::SchemaElements schema;
std::vector<parquet::format::RowGroup> row_groups_complete;
std::vector<std::vector<parquet::format::ColumnIndex>> column_indexes;
std::vector<std::vector<parquet::format::OffsetIndex>> offset_indexes;
size_t base_offset = 0;

View File

@ -0,0 +1,319 @@
#include <gtest/gtest.h>
#include <config.h>
#if USE_PARQUET
# include <Columns/ColumnsNumber.h>
# include <DataTypes/DataTypesNumber.h>
# include <IO/WriteBufferFromFile.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Processors/Executors/PipelineExecutor.h>
# include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
# include <Processors/ISource.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <arrow/io/file.h>
# include <parquet/file_reader.h>
# include <parquet/page_index.h>
# include <parquet/thrift_internal.h>
using namespace DB;
namespace
{
template <class T>
std::shared_ptr<ISource> multiColumnsSource(const std::vector<DataTypePtr> & type, const std::vector<std::vector<T>> & values, size_t times)
{
Block header;
for (size_t i = 0; i < type.size(); ++i)
{
header.insert(ColumnWithTypeAndName(type[i], "x" + std::to_string(i)));
}
Chunks chunks;
for (size_t i = 0; i < times; ++i)
{
Chunk chunk;
Columns columns;
for (size_t j = 0; j < type.size(); ++j)
{
auto column = type[j]->createColumn();
for (const auto& n : values[j])
{
if constexpr (std::is_same_v<T, UInt64>)
if (n == 0)
column->insertDefault();
else
column->insert(n);
else
column->insert(n);
}
columns.push_back(std::move(column));
}
chunk = Chunk(Columns{std::move(columns)}, values[0].size());
chunks.push_back(std::move(chunk));
}
return std::make_shared<SourceFromChunks>(header, std::move(chunks));
}
void validatePageIndex(
String path,
std::optional<std::function<void(std::vector<bool>)>> validate_null_pages = std::nullopt,
std::optional<std::function<void(std::vector<int64_t>)>> validate_null_counts = std::nullopt)
{
std::shared_ptr<::arrow::io::RandomAccessFile> source;
PARQUET_ASSIGN_OR_THROW(source, ::arrow::io::ReadableFile::Open(path))
auto reader = parquet::ParquetFileReader::OpenFile(path);
auto metadata = reader->metadata();
auto properties = parquet::default_reader_properties();
parquet::ThriftDeserializer deserializer(properties);
for (int i = 0; i < metadata->num_row_groups(); ++i)
{
auto row_group = metadata->RowGroup(i);
std::vector<int64_t> column_index_offsets;
std::vector<int64_t> column_index_lengths;
std::vector<int64_t> offset_index_offsets;
std::vector<int64_t> offset_index_lengths;
for (int j = 0; j < row_group->num_columns(); j++)
{
auto column_chunk = row_group->ColumnChunk(j);
auto column_index = column_chunk->GetColumnIndexLocation();
auto offset_index = column_chunk->GetOffsetIndexLocation();
ASSERT_TRUE(column_index.has_value());
ASSERT_TRUE(offset_index.has_value());
ASSERT_GT(column_index.value().offset, 0);
ASSERT_GT(column_index.value().length, 0);
ASSERT_GT(offset_index.value().offset, 0);
ASSERT_GT(offset_index.value().length, 0);
column_index_offsets.push_back(column_index.value().offset);
column_index_lengths.push_back(column_index.value().length);
offset_index_offsets.push_back(offset_index.value().offset);
offset_index_lengths.push_back(offset_index.value().length);
}
for (int k = 0; k < row_group->num_columns(); k++)
{
auto page_index_reader = reader->GetPageIndexReader();
ASSERT_TRUE(page_index_reader != nullptr);
PARQUET_ASSIGN_OR_THROW(auto column_index_buffer, source->ReadAt(column_index_offsets[k], column_index_lengths[k]))
PARQUET_ASSIGN_OR_THROW(auto offset_index_buffer, source->ReadAt(offset_index_offsets[k], offset_index_lengths[k]))
const auto *column_descr = metadata->schema()->Column(k);
std::unique_ptr<parquet::ColumnIndex> column_index = parquet::ColumnIndex::Make(
*column_descr, column_index_buffer->data(), static_cast<uint32_t>(column_index_buffer->size()), properties);
std::unique_ptr<parquet::OffsetIndex> offset_index
= parquet::OffsetIndex::Make(offset_index_buffer->data(), static_cast<uint32_t>(offset_index_buffer->size()), properties);
// validate null pages
if (validate_null_pages.has_value())
{
validate_null_pages.value()(column_index->null_pages());
}
size_t num_pages = offset_index->page_locations().size();
// validate null counts
if (column_index->has_null_counts())
{
ASSERT_EQ(column_index->null_counts().size(), num_pages);
if (validate_null_counts.has_value())
{
validate_null_counts.value()(column_index->null_counts());
}
}
// validate min max values
auto total_values = 0;
for (size_t l = 0; l < num_pages; l++)
{
auto page_location = offset_index->page_locations().at(l);
PARQUET_ASSIGN_OR_THROW(auto page_buffer, source->ReadAt(page_location.offset, page_location.compressed_page_size))
parquet::format::PageHeader header;
uint32_t header_size = static_cast<uint32_t>(page_buffer->size());
deserializer.DeserializeMessage(page_buffer->data(), &header_size, &header);
ASSERT_TRUE(header.type == parquet::format::PageType::DATA_PAGE);
if (!column_index->null_pages().at(l))
{
ASSERT_EQ(header.data_page_header.statistics.min_value, column_index->encoded_min_values().at(l));
ASSERT_EQ(header.data_page_header.statistics.max_value, column_index->encoded_max_values().at(l));
if (column_index->has_null_counts())
ASSERT_GT(header.data_page_header.num_values, column_index->null_counts().at(l));
}
else
{
ASSERT_EQ(header.data_page_header.num_values, column_index->null_counts().at(l));
}
ASSERT_EQ(page_location.first_row_index, total_values);
total_values += header.data_page_header.num_values;
}
}
}
}
void writeParquet(SourcePtr source, const FormatSettings & format_settings, String parquet_path)
{
QueryPipelineBuilder pipeline_builder;
pipeline_builder.init(Pipe(source));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline_builder));
WriteBufferFromFile write_buffer(parquet_path);
auto output = std::make_shared<ParquetBlockOutputFormat>(write_buffer, pipeline.getHeader(), format_settings);
pipeline.complete(output);
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
TEST(Parquet, WriteParquetPageIndexParrelel)
{
FormatSettings format_settings;
format_settings.parquet.row_group_rows = 10000;
format_settings.parquet.use_custom_encoder = true;
format_settings.parquet.parallel_encoding = true;
format_settings.parquet.write_page_index = true;
format_settings.parquet.data_page_size = 32;
format_settings.max_threads = 2;
std::vector<std::vector<UInt64>> values;
std::vector<UInt64> col;
col.reserve(1000);
for (size_t i = 0; i < 1000; i++)
{
col.push_back(i % 10);
}
values.push_back(col);
values.push_back(col);
auto source = multiColumnsSource<UInt64>(
{makeNullable(std::make_shared<DataTypeUInt64>()), makeNullable(std::make_shared<DataTypeUInt64>())}, values, 100);
String path = "/tmp/test.parquet";
writeParquet(source, format_settings, path);
validatePageIndex(
path,
[](auto null_pages)
{
for (auto null_page : null_pages)
{
ASSERT_TRUE(!null_page);
}
},
[](auto null_counts)
{
for (auto null_count : null_counts)
{
ASSERT_TRUE(null_count > 0);
}
});
}
TEST(Parquet, WriteParquetPageIndexParrelelPlainEnconding)
{
FormatSettings format_settings;
format_settings.parquet.row_group_rows = 10000;
format_settings.parquet.use_custom_encoder = true;
format_settings.parquet.parallel_encoding = true;
format_settings.parquet.write_page_index = true;
format_settings.parquet.data_page_size = 32;
format_settings.max_threads = 2;
std::vector<std::vector<String>> values;
std::vector<String> col;
for (size_t i = 0; i < 100000; i++)
{
col.push_back(std::to_string(i));
}
values.push_back(col);
values.push_back(col);
auto source = multiColumnsSource<String>(
{makeNullable(std::make_shared<DataTypeString>()), makeNullable(std::make_shared<DataTypeString>())}, values, 10);
String path = "/tmp/test.parquet";
writeParquet(source, format_settings, path);
validatePageIndex(
path,
[](auto null_pages)
{
for (auto null_page : null_pages)
{
ASSERT_FALSE(null_page);
}
},
[](auto null_counts)
{
for (auto null_count : null_counts)
{
ASSERT_EQ(null_count, 0);
}
});
}
TEST(Parquet, WriteParquetPageIndexParrelelAllNull)
{
FormatSettings format_settings;
format_settings.parquet.row_group_rows = 10000;
format_settings.parquet.use_custom_encoder = true;
format_settings.parquet.parallel_encoding = true;
format_settings.parquet.write_page_index = true;
format_settings.parquet.data_page_size = 32;
format_settings.max_threads = 2;
std::vector<std::vector<UInt64>> values;
auto & col = values.emplace_back();
for (size_t i = 0; i < 1000; i++)
{
col.push_back(0);
}
auto source = multiColumnsSource<UInt64>({makeNullable(std::make_shared<DataTypeUInt64>())}, values, 100);
String path = "/tmp/test.parquet";
writeParquet(source, format_settings, path);
validatePageIndex(
path,
[](auto null_pages)
{
for (auto null_page : null_pages)
{
ASSERT_TRUE(null_page);
}
},
[](auto null_counts)
{
for (auto null_count : null_counts)
{
ASSERT_TRUE(null_count > 0);
}
});
}
TEST(Parquet, WriteParquetPageIndexSingleThread)
{
FormatSettings format_settings;
format_settings.parquet.row_group_rows = 10000;
format_settings.parquet.use_custom_encoder = true;
format_settings.parquet.parallel_encoding = false;
format_settings.parquet.write_page_index = true;
format_settings.parquet.data_page_size = 32;
std::vector<std::vector<UInt64>> values;
std::vector<UInt64> col;
for (size_t i = 0; i < 1000; i++)
{
col.push_back(i % 10);
}
values.push_back(col);
values.push_back(col);
auto source = multiColumnsSource<UInt64>(
{makeNullable(std::make_shared<DataTypeUInt64>()), makeNullable(std::make_shared<DataTypeUInt64>())}, values, 100);
String path = "/tmp/test.parquet";
writeParquet(source, format_settings, path);
validatePageIndex(
path,
[](auto null_pages)
{
for (auto null_page : null_pages)
{
ASSERT_TRUE(!null_page);
}
},
[](auto null_counts)
{
for (auto null_count : null_counts)
{
ASSERT_TRUE(null_count > 0);
}
});
}
}
#endif

View File

@ -55,7 +55,7 @@ def delete_if_exists(file_path):
(
"SELECT number, number+1 FROM system.numbers LIMIT 100 "
"INTO OUTFILE '{file_name}' FORMAT Parquet;",
False,
True,
),
},
)
@ -92,7 +92,7 @@ def test_parquet_page_index_select_into_outfile(query, expected_result, start_cl
(
"INSERT INTO TABLE FUNCTION file('{file_name}') "
"SELECT number, number+1 FROM system.numbers LIMIT 100 FORMAT Parquet",
False,
True,
),
},
)