Merge pull request #53281 from Avogar/batch-small-parquet-row-groups

Optimize reading small row groups by batching them together in Parquet
This commit is contained in:
Michael Kolupaev 2023-08-18 17:15:42 -07:00 committed by GitHub
commit a1522e22ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 104 additions and 89 deletions

View File

@ -2136,6 +2136,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`.
- [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`.
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.
- [input_format_parquet_local_file_min_bytes_for_seek](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_local_file_min_bytes_for_seek) - min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format. Default value - `8192`.
- [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`.
- [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`.
- [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`.

View File

@ -1223,6 +1223,12 @@ Allow skipping columns with unsupported types while schema inference for format
Disabled by default.
### input_format_parquet_local_file_min_bytes_for_seek {#input_format_parquet_local_file_min_bytes_for_seek}
min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format.
Default value - `8192`.
### output_format_parquet_string_as_string {#output_format_parquet_string_as_string}
Use Parquet String type instead of Binary for String columns.

View File

@ -879,6 +879,7 @@ class IColumn;
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \
M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \
M(Char, input_format_hive_text_fields_delimiter, '\x01', "Delimiter between fields in Hive Text File", 0) \
M(Char, input_format_hive_text_collection_items_delimiter, '\x02', "Delimiter between collection(array or map) items in Hive Text File", 0) \

View File

@ -133,6 +133,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding;
format_settings.parquet.data_page_size = settings.output_format_parquet_data_page_size;
format_settings.parquet.write_batch_size = settings.output_format_parquet_batch_size;
format_settings.parquet.local_read_min_bytes_for_seek = settings.input_format_parquet_local_file_min_bytes_for_seek;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;

View File

@ -241,6 +241,7 @@ struct FormatSettings
bool output_compliant_nested_types = true;
size_t data_page_size = 1024 * 1024;
size_t write_batch_size = 1024;
size_t local_read_min_bytes_for_seek = 8192;
} parquet;
struct Pretty

View File

@ -84,7 +84,24 @@ void ParquetBlockInputFormat::initializeIfNeeded()
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
row_groups.resize(metadata->num_row_groups());
int num_row_groups = metadata->num_row_groups();
if (num_row_groups == 0)
return;
row_group_batches.reserve(num_row_groups);
for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
continue;
if (row_group_batches.empty() || row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek)
row_group_batches.emplace_back();
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
}
ArrowFieldIndexUtil field_util(
format_settings.parquet.case_insensitive_column_matching,
@ -92,9 +109,9 @@ void ParquetBlockInputFormat::initializeIfNeeded()
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
}
void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
{
auto & row_group = row_groups[row_group_idx];
auto & row_group_batch = row_group_batches[row_group_batch_idx];
parquet::ArrowReaderProperties properties;
properties.set_use_threads(false);
@ -140,33 +157,30 @@ void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata));
builder.properties(properties);
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
THROW_ARROW_NOT_OK(builder.Build(&row_group.file_reader));
THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader));
THROW_ARROW_NOT_OK(
row_group.file_reader->GetRecordBatchReader({static_cast<int>(row_group_idx)}, column_indices, &row_group.record_batch_reader));
row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader));
row_group.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
row_group_batch.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Parquet",
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.parquet.case_insensitive_column_matching);
row_group.row_group_bytes_uncompressed = metadata->RowGroup(static_cast<int>(row_group_idx))->total_compressed_size();
row_group.row_group_rows = metadata->RowGroup(static_cast<int>(row_group_idx))->num_rows();
}
void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx)
{
chassert(!mutex.try_lock());
auto & status = row_groups[row_group_idx].status;
chassert(status == RowGroupState::Status::NotStarted || status == RowGroupState::Status::Paused);
auto & status = row_group_batches[row_group_batch_idx].status;
chassert(status == RowGroupBatchState::Status::NotStarted || status == RowGroupBatchState::Status::Paused);
status = RowGroupState::Status::Running;
status = RowGroupBatchState::Status::Running;
pool->scheduleOrThrowOnError(
[this, row_group_idx, thread_group = CurrentThread::getGroup()]()
[this, row_group_batch_idx, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
@ -176,7 +190,7 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
{
setThreadName("ParquetDecoder");
threadFunction(row_group_idx);
threadFunction(row_group_batch_idx);
}
catch (...)
{
@ -187,44 +201,44 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
});
}
void ParquetBlockInputFormat::threadFunction(size_t row_group_idx)
void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx)
{
std::unique_lock lock(mutex);
auto & row_group = row_groups[row_group_idx];
chassert(row_group.status == RowGroupState::Status::Running);
auto & row_group_batch = row_group_batches[row_group_batch_idx];
chassert(row_group_batch.status == RowGroupBatchState::Status::Running);
while (true)
{
if (is_stopped || row_group.num_pending_chunks >= max_pending_chunks_per_row_group)
if (is_stopped || row_group_batch.num_pending_chunks >= max_pending_chunks_per_row_group_batch)
{
row_group.status = RowGroupState::Status::Paused;
row_group_batch.status = RowGroupBatchState::Status::Paused;
return;
}
decodeOneChunk(row_group_idx, lock);
decodeOneChunk(row_group_batch_idx, lock);
if (row_group.status == RowGroupState::Status::Done)
if (row_group_batch.status == RowGroupBatchState::Status::Done)
return;
}
}
void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock)
void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::unique_lock<std::mutex> & lock)
{
auto & row_group = row_groups[row_group_idx];
chassert(row_group.status != RowGroupState::Status::Done);
auto & row_group_batch = row_group_batches[row_group_batch_idx];
chassert(row_group_batch.status != RowGroupBatchState::Status::Done);
chassert(lock.owns_lock());
SCOPE_EXIT({ chassert(lock.owns_lock() || std::uncaught_exceptions()); });
lock.unlock();
auto end_of_row_group = [&] {
row_group.arrow_column_to_ch_column.reset();
row_group.record_batch_reader.reset();
row_group.file_reader.reset();
row_group_batch.arrow_column_to_ch_column.reset();
row_group_batch.record_batch_reader.reset();
row_group_batch.file_reader.reset();
lock.lock();
row_group.status = RowGroupState::Status::Done;
row_group_batch.status = RowGroupBatchState::Status::Done;
// We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right
// here because we're running on the same thread pool, so it'll deadlock if thread limit is
@ -232,23 +246,11 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_l
condvar.notify_all();
};
if (!row_group.record_batch_reader)
{
if (skip_row_groups.contains(static_cast<int>(row_group_idx)))
{
// Pretend that the row group is empty.
// (We could avoid scheduling the row group on a thread in the first place. But the
// skip_row_groups feature is mostly unused, so it's better to be a little inefficient
// than to add a bunch of extra mostly-dead code for this.)
end_of_row_group();
return;
}
initializeRowGroupReader(row_group_idx);
}
if (!row_group_batch.record_batch_reader)
initializeRowGroupBatchReader(row_group_batch_idx);
auto batch = row_group.record_batch_reader->Next();
auto batch = row_group_batch.record_batch_reader->Next();
if (!batch.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString());
@ -260,44 +262,44 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_l
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
size_t approx_chunk_original_size = static_cast<size_t>(std::ceil(static_cast<double>(row_group.row_group_bytes_uncompressed) / row_group.row_group_rows * (*tmp_table)->num_rows()));
PendingChunk res = {.chunk_idx = row_group.next_chunk_idx, .row_group_idx = row_group_idx, .approx_original_chunk_size = approx_chunk_original_size};
size_t approx_chunk_original_size = static_cast<size_t>(std::ceil(static_cast<double>(row_group_batch.total_bytes_compressed) / row_group_batch.total_rows * (*tmp_table)->num_rows()));
PendingChunk res = {.chunk_idx = row_group_batch.next_chunk_idx, .row_group_batch_idx = row_group_batch_idx, .approx_original_chunk_size = approx_chunk_original_size};
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
lock.lock();
++row_group.next_chunk_idx;
++row_group.num_pending_chunks;
++row_group_batch.next_chunk_idx;
++row_group_batch.num_pending_chunks;
pending_chunks.push(std::move(res));
condvar.notify_all();
}
void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched)
void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_batch_touched)
{
while (row_groups_completed < row_groups.size())
while (row_group_batches_completed < row_group_batches.size())
{
auto & row_group = row_groups[row_groups_completed];
if (row_group.status != RowGroupState::Status::Done || row_group.num_pending_chunks != 0)
auto & row_group = row_group_batches[row_group_batches_completed];
if (row_group.status != RowGroupBatchState::Status::Done || row_group.num_pending_chunks != 0)
break;
++row_groups_completed;
++row_group_batches_completed;
}
if (pool)
{
while (row_groups_started - row_groups_completed < max_decoding_threads &&
row_groups_started < row_groups.size())
scheduleRowGroup(row_groups_started++);
while (row_group_batches_started - row_group_batches_completed < max_decoding_threads &&
row_group_batches_started < row_group_batches.size())
scheduleRowGroup(row_group_batches_started++);
if (row_group_touched)
if (row_group_batch_touched)
{
auto & row_group = row_groups[*row_group_touched];
if (row_group.status == RowGroupState::Status::Paused &&
row_group.num_pending_chunks < max_pending_chunks_per_row_group)
scheduleRowGroup(*row_group_touched);
auto & row_group = row_group_batches[*row_group_batch_touched];
if (row_group.status == RowGroupBatchState::Status::Paused &&
row_group.num_pending_chunks < max_pending_chunks_per_row_group_batch)
scheduleRowGroup(*row_group_batch_touched);
}
}
}
@ -322,30 +324,30 @@ Chunk ParquetBlockInputFormat::generate()
if (!pending_chunks.empty() &&
(!format_settings.parquet.preserve_order ||
pending_chunks.top().row_group_idx == row_groups_completed))
pending_chunks.top().row_group_batch_idx == row_group_batches_completed))
{
PendingChunk chunk = std::move(const_cast<PendingChunk&>(pending_chunks.top()));
pending_chunks.pop();
auto & row_group = row_groups[chunk.row_group_idx];
auto & row_group = row_group_batches[chunk.row_group_batch_idx];
chassert(row_group.num_pending_chunks != 0);
chassert(chunk.chunk_idx == row_group.next_chunk_idx - row_group.num_pending_chunks);
--row_group.num_pending_chunks;
scheduleMoreWorkIfNeeded(chunk.row_group_idx);
scheduleMoreWorkIfNeeded(chunk.row_group_batch_idx);
previous_block_missing_values = std::move(chunk.block_missing_values);
previous_approx_bytes_read_for_chunk = chunk.approx_original_chunk_size;
return std::move(chunk.chunk);
}
if (row_groups_completed == row_groups.size())
if (row_group_batches_completed == row_group_batches.size())
return {};
if (pool)
condvar.wait(lock);
else
decodeOneChunk(row_groups_completed, lock);
decodeOneChunk(row_group_batches_completed, lock);
}
}
@ -358,12 +360,12 @@ void ParquetBlockInputFormat::resetParser()
arrow_file.reset();
metadata.reset();
column_indices.clear();
row_groups.clear();
row_group_batches.clear();
while (!pending_chunks.empty())
pending_chunks.pop();
row_groups_completed = 0;
row_group_batches_completed = 0;
previous_block_missing_values.clear();
row_groups_started = 0;
row_group_batches_started = 0;
background_exception = nullptr;
is_stopped = false;
@ -411,7 +413,7 @@ void registerInputFormatParquet(FormatFactory & factory)
size_t /* max_download_threads */,
size_t max_parsing_threads)
{
size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : 8 * 1024;
size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek;
return std::make_shared<ParquetBlockInputFormat>(
buf,
sample,

View File

@ -52,6 +52,7 @@ public:
const FormatSettings & format_settings,
size_t max_decoding_threads,
size_t min_bytes_for_seek);
~ParquetBlockInputFormat() override;
void resetParser() override;
@ -71,14 +72,14 @@ private:
}
void initializeIfNeeded();
void initializeRowGroupReader(size_t row_group_idx);
void initializeRowGroupBatchReader(size_t row_group_batch_idx);
void decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock);
void decodeOneChunk(size_t row_group_batch_idx, std::unique_lock<std::mutex> & lock);
void scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched = std::nullopt);
void scheduleRowGroup(size_t row_group_idx);
void scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_batch_touched = std::nullopt);
void scheduleRowGroup(size_t row_group_batch_idx);
void threadFunction(size_t row_group_idx);
void threadFunction(size_t row_group_batch_idx);
// Data layout in the file:
//
@ -165,7 +166,7 @@ private:
// * The max_pending_chunks_per_row_group limit could be based on actual memory usage too.
// Useful for preserve_order.
struct RowGroupState
struct RowGroupBatchState
{
// Transitions:
//
@ -202,8 +203,10 @@ private:
size_t next_chunk_idx = 0;
size_t num_pending_chunks = 0;
size_t row_group_bytes_uncompressed = 0;
size_t row_group_rows = 0;
size_t total_rows = 0;
size_t total_bytes_compressed = 0;
std::vector<int> row_groups_idxs;
// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
@ -217,7 +220,7 @@ private:
Chunk chunk;
BlockMissingValues block_missing_values;
size_t chunk_idx; // within row group
size_t row_group_idx;
size_t row_group_batch_idx;
size_t approx_original_chunk_size;
// For priority_queue.
@ -230,8 +233,8 @@ private:
bool operator()(const PendingChunk & a, const PendingChunk & b) const
{
auto tuplificate = [this](const PendingChunk & c)
{ return row_group_first ? std::tie(c.row_group_idx, c.chunk_idx)
: std::tie(c.chunk_idx, c.row_group_idx); };
{ return row_group_first ? std::tie(c.row_group_batch_idx, c.chunk_idx)
: std::tie(c.chunk_idx, c.row_group_batch_idx); };
return tuplificate(a) > tuplificate(b);
}
};
@ -241,7 +244,7 @@ private:
const std::unordered_set<int> & skip_row_groups;
size_t max_decoding_threads;
size_t min_bytes_for_seek;
const size_t max_pending_chunks_per_row_group = 2;
const size_t max_pending_chunks_per_row_group_batch = 2;
// RandomAccessFile is thread safe, so we share it among threads.
// FileReader is not, so each thread creates its own.
@ -264,12 +267,12 @@ private:
// Wakes up the generate() call, if any.
std::condition_variable condvar;
std::vector<RowGroupState> row_groups;
std::vector<RowGroupBatchState> row_group_batches;
std::priority_queue<PendingChunk, std::vector<PendingChunk>, PendingChunk::Compare> pending_chunks;
size_t row_groups_completed = 0;
size_t row_group_batches_completed = 0;
// These are only used when max_decoding_threads > 1.
size_t row_groups_started = 0;
size_t row_group_batches_started = 0;
std::unique_ptr<ThreadPool> pool;
BlockMissingValues previous_block_missing_values;

View File

@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# It'll be read into two blocks. The first block will sleep 2x longer than the second.
# So reordering is very likely if the order-preservation doesn't work.
$CLICKHOUSE_LOCAL -q "select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, function_sleep_max_microseconds_per_block = 6000000"
$CLICKHOUSE_LOCAL -q "select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, function_sleep_max_microseconds_per_block = 6000000, input_format_parquet_local_file_min_bytes_for_seek=0"
$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2"
$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2"
$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2, input_format_parquet_local_file_min_bytes_for_seek=0"
$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2, input_format_parquet_local_file_min_bytes_for_seek=0"