diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index e2122380510..5ba12eba26a 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -2136,6 +2136,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t - [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`. - [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`. - [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`. +- [input_format_parquet_local_file_min_bytes_for_seek](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_local_file_min_bytes_for_seek) - min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format. Default value - `8192`. - [output_format_parquet_fixed_string_as_fixed_byte_array](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_fixed_string_as_fixed_byte_array) - use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary/String for FixedString columns. Default value - `true`. - [output_format_parquet_version](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_version) - The version of Parquet format used in output format. Default value - `2.latest`. - [output_format_parquet_compression_method](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_compression_method) - compression method used in output Parquet format. Default value - `snappy`. diff --git a/docs/en/operations/settings/settings-formats.md b/docs/en/operations/settings/settings-formats.md index beb1d372e08..86aabae187f 100644 --- a/docs/en/operations/settings/settings-formats.md +++ b/docs/en/operations/settings/settings-formats.md @@ -1223,6 +1223,12 @@ Allow skipping columns with unsupported types while schema inference for format Disabled by default. +### input_format_parquet_local_file_min_bytes_for_seek {#input_format_parquet_local_file_min_bytes_for_seek} + +min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format. + +Default value - `8192`. + ### output_format_parquet_string_as_string {#output_format_parquet_string_as_string} Use Parquet String type instead of Binary for String columns. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 779ecdd434f..57bf4bea2ec 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -879,6 +879,7 @@ class IColumn; M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \ M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \ M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \ + M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \ M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \ M(Char, input_format_hive_text_fields_delimiter, '\x01', "Delimiter between fields in Hive Text File", 0) \ M(Char, input_format_hive_text_collection_items_delimiter, '\x02', "Delimiter between collection(array or map) items in Hive Text File", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index dd07ca7c981..56d27a59315 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -133,6 +133,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding; format_settings.parquet.data_page_size = settings.output_format_parquet_data_page_size; format_settings.parquet.write_batch_size = settings.output_format_parquet_batch_size; + format_settings.parquet.local_read_min_bytes_for_seek = settings.input_format_parquet_local_file_min_bytes_for_seek; format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8; format_settings.pretty.color = settings.output_format_pretty_color; format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 2c283dcc2b7..56eb7686f56 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -241,6 +241,7 @@ struct FormatSettings bool output_compliant_nested_types = true; size_t data_page_size = 1024 * 1024; size_t write_batch_size = 1024; + size_t local_read_min_bytes_for_seek = 8192; } parquet; struct Pretty diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 902a02130aa..a9c83416a74 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -84,7 +84,24 @@ void ParquetBlockInputFormat::initializeIfNeeded() std::shared_ptr schema; THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); - row_groups.resize(metadata->num_row_groups()); + int num_row_groups = metadata->num_row_groups(); + if (num_row_groups == 0) + return; + + row_group_batches.reserve(num_row_groups); + + for (int row_group = 0; row_group < num_row_groups; ++row_group) + { + if (skip_row_groups.contains(row_group)) + continue; + + if (row_group_batches.empty() || row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek) + row_group_batches.emplace_back(); + + row_group_batches.back().row_groups_idxs.push_back(row_group); + row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows(); + row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size(); + } ArrowFieldIndexUtil field_util( format_settings.parquet.case_insensitive_column_matching, @@ -92,9 +109,9 @@ void ParquetBlockInputFormat::initializeIfNeeded() column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema); } -void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx) +void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) { - auto & row_group = row_groups[row_group_idx]; + auto & row_group_batch = row_group_batches[row_group_batch_idx]; parquet::ArrowReaderProperties properties; properties.set_use_threads(false); @@ -140,33 +157,30 @@ void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx) builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata)); builder.properties(properties); // TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators. - THROW_ARROW_NOT_OK(builder.Build(&row_group.file_reader)); + THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader)); THROW_ARROW_NOT_OK( - row_group.file_reader->GetRecordBatchReader({static_cast(row_group_idx)}, column_indices, &row_group.record_batch_reader)); + row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader)); - row_group.arrow_column_to_ch_column = std::make_unique( + row_group_batch.arrow_column_to_ch_column = std::make_unique( getPort().getHeader(), "Parquet", format_settings.parquet.allow_missing_columns, format_settings.null_as_default, format_settings.parquet.case_insensitive_column_matching); - - row_group.row_group_bytes_uncompressed = metadata->RowGroup(static_cast(row_group_idx))->total_compressed_size(); - row_group.row_group_rows = metadata->RowGroup(static_cast(row_group_idx))->num_rows(); } -void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx) +void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx) { chassert(!mutex.try_lock()); - auto & status = row_groups[row_group_idx].status; - chassert(status == RowGroupState::Status::NotStarted || status == RowGroupState::Status::Paused); + auto & status = row_group_batches[row_group_batch_idx].status; + chassert(status == RowGroupBatchState::Status::NotStarted || status == RowGroupBatchState::Status::Paused); - status = RowGroupState::Status::Running; + status = RowGroupBatchState::Status::Running; pool->scheduleOrThrowOnError( - [this, row_group_idx, thread_group = CurrentThread::getGroup()]() + [this, row_group_batch_idx, thread_group = CurrentThread::getGroup()]() { if (thread_group) CurrentThread::attachToGroupIfDetached(thread_group); @@ -176,7 +190,7 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx) { setThreadName("ParquetDecoder"); - threadFunction(row_group_idx); + threadFunction(row_group_batch_idx); } catch (...) { @@ -187,44 +201,44 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx) }); } -void ParquetBlockInputFormat::threadFunction(size_t row_group_idx) +void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx) { std::unique_lock lock(mutex); - auto & row_group = row_groups[row_group_idx]; - chassert(row_group.status == RowGroupState::Status::Running); + auto & row_group_batch = row_group_batches[row_group_batch_idx]; + chassert(row_group_batch.status == RowGroupBatchState::Status::Running); while (true) { - if (is_stopped || row_group.num_pending_chunks >= max_pending_chunks_per_row_group) + if (is_stopped || row_group_batch.num_pending_chunks >= max_pending_chunks_per_row_group_batch) { - row_group.status = RowGroupState::Status::Paused; + row_group_batch.status = RowGroupBatchState::Status::Paused; return; } - decodeOneChunk(row_group_idx, lock); + decodeOneChunk(row_group_batch_idx, lock); - if (row_group.status == RowGroupState::Status::Done) + if (row_group_batch.status == RowGroupBatchState::Status::Done) return; } } -void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_lock & lock) +void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::unique_lock & lock) { - auto & row_group = row_groups[row_group_idx]; - chassert(row_group.status != RowGroupState::Status::Done); + auto & row_group_batch = row_group_batches[row_group_batch_idx]; + chassert(row_group_batch.status != RowGroupBatchState::Status::Done); chassert(lock.owns_lock()); SCOPE_EXIT({ chassert(lock.owns_lock() || std::uncaught_exceptions()); }); lock.unlock(); auto end_of_row_group = [&] { - row_group.arrow_column_to_ch_column.reset(); - row_group.record_batch_reader.reset(); - row_group.file_reader.reset(); + row_group_batch.arrow_column_to_ch_column.reset(); + row_group_batch.record_batch_reader.reset(); + row_group_batch.file_reader.reset(); lock.lock(); - row_group.status = RowGroupState::Status::Done; + row_group_batch.status = RowGroupBatchState::Status::Done; // We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right // here because we're running on the same thread pool, so it'll deadlock if thread limit is @@ -232,23 +246,11 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_l condvar.notify_all(); }; - if (!row_group.record_batch_reader) - { - if (skip_row_groups.contains(static_cast(row_group_idx))) - { - // Pretend that the row group is empty. - // (We could avoid scheduling the row group on a thread in the first place. But the - // skip_row_groups feature is mostly unused, so it's better to be a little inefficient - // than to add a bunch of extra mostly-dead code for this.) - end_of_row_group(); - return; - } - - initializeRowGroupReader(row_group_idx); - } + if (!row_group_batch.record_batch_reader) + initializeRowGroupBatchReader(row_group_batch_idx); - auto batch = row_group.record_batch_reader->Next(); + auto batch = row_group_batch.record_batch_reader->Next(); if (!batch.ok()) throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString()); @@ -260,44 +262,44 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_l auto tmp_table = arrow::Table::FromRecordBatches({*batch}); - size_t approx_chunk_original_size = static_cast(std::ceil(static_cast(row_group.row_group_bytes_uncompressed) / row_group.row_group_rows * (*tmp_table)->num_rows())); - PendingChunk res = {.chunk_idx = row_group.next_chunk_idx, .row_group_idx = row_group_idx, .approx_original_chunk_size = approx_chunk_original_size}; + size_t approx_chunk_original_size = static_cast(std::ceil(static_cast(row_group_batch.total_bytes_compressed) / row_group_batch.total_rows * (*tmp_table)->num_rows())); + PendingChunk res = {.chunk_idx = row_group_batch.next_chunk_idx, .row_group_batch_idx = row_group_batch_idx, .approx_original_chunk_size = approx_chunk_original_size}; /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. /// Otherwise fill the missing columns with zero values of its type. BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr; - row_group.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); + row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr); lock.lock(); - ++row_group.next_chunk_idx; - ++row_group.num_pending_chunks; + ++row_group_batch.next_chunk_idx; + ++row_group_batch.num_pending_chunks; pending_chunks.push(std::move(res)); condvar.notify_all(); } -void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row_group_touched) +void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row_group_batch_touched) { - while (row_groups_completed < row_groups.size()) + while (row_group_batches_completed < row_group_batches.size()) { - auto & row_group = row_groups[row_groups_completed]; - if (row_group.status != RowGroupState::Status::Done || row_group.num_pending_chunks != 0) + auto & row_group = row_group_batches[row_group_batches_completed]; + if (row_group.status != RowGroupBatchState::Status::Done || row_group.num_pending_chunks != 0) break; - ++row_groups_completed; + ++row_group_batches_completed; } if (pool) { - while (row_groups_started - row_groups_completed < max_decoding_threads && - row_groups_started < row_groups.size()) - scheduleRowGroup(row_groups_started++); + while (row_group_batches_started - row_group_batches_completed < max_decoding_threads && + row_group_batches_started < row_group_batches.size()) + scheduleRowGroup(row_group_batches_started++); - if (row_group_touched) + if (row_group_batch_touched) { - auto & row_group = row_groups[*row_group_touched]; - if (row_group.status == RowGroupState::Status::Paused && - row_group.num_pending_chunks < max_pending_chunks_per_row_group) - scheduleRowGroup(*row_group_touched); + auto & row_group = row_group_batches[*row_group_batch_touched]; + if (row_group.status == RowGroupBatchState::Status::Paused && + row_group.num_pending_chunks < max_pending_chunks_per_row_group_batch) + scheduleRowGroup(*row_group_batch_touched); } } } @@ -322,30 +324,30 @@ Chunk ParquetBlockInputFormat::generate() if (!pending_chunks.empty() && (!format_settings.parquet.preserve_order || - pending_chunks.top().row_group_idx == row_groups_completed)) + pending_chunks.top().row_group_batch_idx == row_group_batches_completed)) { PendingChunk chunk = std::move(const_cast(pending_chunks.top())); pending_chunks.pop(); - auto & row_group = row_groups[chunk.row_group_idx]; + auto & row_group = row_group_batches[chunk.row_group_batch_idx]; chassert(row_group.num_pending_chunks != 0); chassert(chunk.chunk_idx == row_group.next_chunk_idx - row_group.num_pending_chunks); --row_group.num_pending_chunks; - scheduleMoreWorkIfNeeded(chunk.row_group_idx); + scheduleMoreWorkIfNeeded(chunk.row_group_batch_idx); previous_block_missing_values = std::move(chunk.block_missing_values); previous_approx_bytes_read_for_chunk = chunk.approx_original_chunk_size; return std::move(chunk.chunk); } - if (row_groups_completed == row_groups.size()) + if (row_group_batches_completed == row_group_batches.size()) return {}; if (pool) condvar.wait(lock); else - decodeOneChunk(row_groups_completed, lock); + decodeOneChunk(row_group_batches_completed, lock); } } @@ -358,12 +360,12 @@ void ParquetBlockInputFormat::resetParser() arrow_file.reset(); metadata.reset(); column_indices.clear(); - row_groups.clear(); + row_group_batches.clear(); while (!pending_chunks.empty()) pending_chunks.pop(); - row_groups_completed = 0; + row_group_batches_completed = 0; previous_block_missing_values.clear(); - row_groups_started = 0; + row_group_batches_started = 0; background_exception = nullptr; is_stopped = false; @@ -411,7 +413,7 @@ void registerInputFormatParquet(FormatFactory & factory) size_t /* max_download_threads */, size_t max_parsing_threads) { - size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : 8 * 1024; + size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; return std::make_shared( buf, sample, diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index dc14edf2099..1f75d26f14a 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -52,6 +52,7 @@ public: const FormatSettings & format_settings, size_t max_decoding_threads, size_t min_bytes_for_seek); + ~ParquetBlockInputFormat() override; void resetParser() override; @@ -71,14 +72,14 @@ private: } void initializeIfNeeded(); - void initializeRowGroupReader(size_t row_group_idx); + void initializeRowGroupBatchReader(size_t row_group_batch_idx); - void decodeOneChunk(size_t row_group_idx, std::unique_lock & lock); + void decodeOneChunk(size_t row_group_batch_idx, std::unique_lock & lock); - void scheduleMoreWorkIfNeeded(std::optional row_group_touched = std::nullopt); - void scheduleRowGroup(size_t row_group_idx); + void scheduleMoreWorkIfNeeded(std::optional row_group_batch_touched = std::nullopt); + void scheduleRowGroup(size_t row_group_batch_idx); - void threadFunction(size_t row_group_idx); + void threadFunction(size_t row_group_batch_idx); // Data layout in the file: // @@ -165,7 +166,7 @@ private: // * The max_pending_chunks_per_row_group limit could be based on actual memory usage too. // Useful for preserve_order. - struct RowGroupState + struct RowGroupBatchState { // Transitions: // @@ -202,8 +203,10 @@ private: size_t next_chunk_idx = 0; size_t num_pending_chunks = 0; - size_t row_group_bytes_uncompressed = 0; - size_t row_group_rows = 0; + size_t total_rows = 0; + size_t total_bytes_compressed = 0; + + std::vector row_groups_idxs; // These are only used by the decoding thread, so don't require locking the mutex. std::unique_ptr file_reader; @@ -217,7 +220,7 @@ private: Chunk chunk; BlockMissingValues block_missing_values; size_t chunk_idx; // within row group - size_t row_group_idx; + size_t row_group_batch_idx; size_t approx_original_chunk_size; // For priority_queue. @@ -230,8 +233,8 @@ private: bool operator()(const PendingChunk & a, const PendingChunk & b) const { auto tuplificate = [this](const PendingChunk & c) - { return row_group_first ? std::tie(c.row_group_idx, c.chunk_idx) - : std::tie(c.chunk_idx, c.row_group_idx); }; + { return row_group_first ? std::tie(c.row_group_batch_idx, c.chunk_idx) + : std::tie(c.chunk_idx, c.row_group_batch_idx); }; return tuplificate(a) > tuplificate(b); } }; @@ -241,7 +244,7 @@ private: const std::unordered_set & skip_row_groups; size_t max_decoding_threads; size_t min_bytes_for_seek; - const size_t max_pending_chunks_per_row_group = 2; + const size_t max_pending_chunks_per_row_group_batch = 2; // RandomAccessFile is thread safe, so we share it among threads. // FileReader is not, so each thread creates its own. @@ -264,12 +267,12 @@ private: // Wakes up the generate() call, if any. std::condition_variable condvar; - std::vector row_groups; + std::vector row_group_batches; std::priority_queue, PendingChunk::Compare> pending_chunks; - size_t row_groups_completed = 0; + size_t row_group_batches_completed = 0; // These are only used when max_decoding_threads > 1. - size_t row_groups_started = 0; + size_t row_group_batches_started = 0; std::unique_ptr pool; BlockMissingValues previous_block_missing_values; diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.sh b/tests/queries/0_stateless/02725_parquet_preserve_order.sh index ac29ef3f361..94f2eaaa753 100755 --- a/tests/queries/0_stateless/02725_parquet_preserve_order.sh +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.sh @@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # It'll be read into two blocks. The first block will sleep 2x longer than the second. # So reordering is very likely if the order-preservation doesn't work. -$CLICKHOUSE_LOCAL -q "select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, function_sleep_max_microseconds_per_block = 6000000" +$CLICKHOUSE_LOCAL -q "select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, function_sleep_max_microseconds_per_block = 6000000, input_format_parquet_local_file_min_bytes_for_seek=0" -$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2" -$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2" +$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2, input_format_parquet_local_file_min_bytes_for_seek=0" +$CLICKHOUSE_LOCAL -q "explain pipeline select number + sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2, input_format_parquet_local_file_min_bytes_for_seek=0"