From d6731aba8b236f4654ce213ba645ef4da3c8fe38 Mon Sep 17 00:00:00 2001 From: sakulali Date: Wed, 21 Aug 2024 00:53:16 +0800 Subject: [PATCH 1/7] Apply preferred_block_size_bytes and preferred_max_column_in_block_size_bytes settings for IRowInputFormat --- docs/en/operations/settings/settings.md | 8 ++++- src/Formats/FormatFactory.cpp | 2 ++ src/Processors/Formats/IRowInputFormat.cpp | 36 ++++++++++++++++++++-- src/Processors/Formats/IRowInputFormat.h | 4 +++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 19db4be17db..b7dadc2e61a 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -753,7 +753,13 @@ Default value: `65,409` Used for the same purpose as `max_block_size`, but it sets the recommended block size in bytes by adapting it to the number of rows in the block. However, the block size cannot be more than `max_block_size` rows. -By default: 1,000,000. It only works when reading from MergeTree engines. +By default: 1,000,000. It works when reading from MergeTree engines and reading contents from row input formats. + +## preferred_max_column_in_block_size_bytes {#preferred-max-column-in-block-size_bytes} + +Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size. +However, the column size in block cannot be more than `max_block_size` rows. +By default: 0 - Disabled. It works when reading from MergeTree engines and reading contents from row input formats. ## max_concurrent_queries_for_user {#max-concurrent-queries-for-user} diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 865b6e6f3f1..5fc23884bb9 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -330,6 +330,8 @@ InputFormatPtr FormatFactory::getInput( RowInputFormatParams row_input_format_params; row_input_format_params.max_block_size = max_block_size; + row_input_format_params.preferred_block_size_bytes = settings.preferred_block_size_bytes; + row_input_format_params.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes; row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num; row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio; row_input_format_params.max_execution_time = settings.max_execution_time; diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 0b6c81923db..c661196432a 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -85,6 +85,28 @@ void IRowInputFormat::logError() errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data}); } +bool IRowInputFormat::overPreferredBlockSizeLimit(const MutableColumns & columns) const +{ + if (params.preferred_block_size_bytes || params.preferred_max_column_in_block_size_bytes) + { + size_t block_size_bytes = 0; + size_t max_column_in_block_size_bytes = 0; + for (const auto & column : columns) + { + block_size_bytes += column->byteSize(); + max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, column->byteSize()); + + if (params.preferred_block_size_bytes && block_size_bytes >= params.preferred_block_size_bytes) + return true; + + if (params.preferred_max_column_in_block_size_bytes && max_column_in_block_size_bytes >= params.preferred_max_column_in_block_size_bytes) + return true; + } + } + + return false; +} + Chunk IRowInputFormat::read() { if (total_rows == 0) @@ -107,8 +129,13 @@ Chunk IRowInputFormat::read() block_missing_values.clear(); + auto chunk_offset = [&]() -> size_t + { + return getDataOffsetMaybeCompressed(getReadBuffer()) + getReadBuffer().offset(); + }; + size_t num_rows = 0; - size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer()); + size_t chunk_start_offset = chunk_offset(); try { if (need_only_count && supportsCountRows()) @@ -120,7 +147,7 @@ Chunk IRowInputFormat::read() return {}; } total_rows += num_rows; - approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset; + approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset; return getChunkForCount(num_rows); } @@ -157,6 +184,9 @@ Chunk IRowInputFormat::read() /// The case when there is no columns. Just count rows. if (columns.empty()) ++num_rows; + + if (overPreferredBlockSizeLimit(columns)) + break; } catch (Exception & e) { @@ -244,7 +274,7 @@ Chunk IRowInputFormat::read() column->finalize(); Chunk chunk(std::move(columns), num_rows); - approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset; + approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset; return chunk; } diff --git a/src/Processors/Formats/IRowInputFormat.h b/src/Processors/Formats/IRowInputFormat.h index f8796df8604..9cdb9dea1ce 100644 --- a/src/Processors/Formats/IRowInputFormat.h +++ b/src/Processors/Formats/IRowInputFormat.h @@ -23,6 +23,8 @@ struct RowReadExtension struct RowInputFormatParams { size_t max_block_size = 0; + UInt64 preferred_block_size_bytes = 0; + UInt64 preferred_max_column_in_block_size_bytes = 0; UInt64 allow_errors_num = 0; Float64 allow_errors_ratio = 0; @@ -77,6 +79,8 @@ protected: void logError(); + bool overPreferredBlockSizeLimit(const MutableColumns & columns) const; + const BlockMissingValues & getMissingValues() const override { return block_missing_values; } size_t getRowNum() const { return total_rows; } From 67d80bafd27134bd4098667e3bbb04a7bae5e115 Mon Sep 17 00:00:00 2001 From: sakulali Date: Sat, 24 Aug 2024 00:48:40 +0800 Subject: [PATCH 2/7] fix typo --- docs/en/interfaces/formats.md | 2 +- docs/en/operations/settings/settings.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index 8892c6d8d3f..41d4787f6bc 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -252,7 +252,7 @@ This format is also available under the names `TSVRawWithNames`, `RawWithNames`. Differs from `TabSeparatedWithNamesAndTypes` format in that the rows are written without escaping. When parsing with this format, tabs or linefeeds are not allowed in each field. -This format is also available under the names `TSVRawWithNamesAndNames`, `RawWithNamesAndNames`. +This format is also available under the names `TSVRawWithNamesAndTypes`, `RawWithNamesAndTypes`. ## Template {#format-template} diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index b7dadc2e61a..241609e4fde 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -755,7 +755,7 @@ Used for the same purpose as `max_block_size`, but it sets the recommended block However, the block size cannot be more than `max_block_size` rows. By default: 1,000,000. It works when reading from MergeTree engines and reading contents from row input formats. -## preferred_max_column_in_block_size_bytes {#preferred-max-column-in-block-size_bytes} +## preferred_max_column_in_block_size_bytes {#preferred-max-column-in-block-size-bytes} Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size. However, the column size in block cannot be more than `max_block_size` rows. From 43c2f38524f29e57adf8d7f8828d6403c9d380c7 Mon Sep 17 00:00:00 2001 From: sakulali Date: Sun, 27 Oct 2024 15:47:42 +0800 Subject: [PATCH 3/7] moved func to anonymous ns --- src/Formats/FormatFactory.cpp | 6 ++- src/Processors/Formats/IRowInputFormat.cpp | 46 +++++++++++----------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 3eb5920c866..0607164d3ea 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -50,6 +50,8 @@ FORMAT_FACTORY_SETTINGS(DECLARE_FORMAT_EXTERN, SKIP_ALIAS) extern const SettingsBool output_format_parallel_formatting; extern const SettingsOverflowMode timeout_overflow_mode; extern const SettingsInt64 zstd_window_log_max; + extern const SettingsUInt64 preferred_block_size_bytes; + extern const SettingsUInt64 preferred_max_column_in_block_size_bytes; } namespace ErrorCodes @@ -361,8 +363,8 @@ InputFormatPtr FormatFactory::getInput( RowInputFormatParams row_input_format_params; row_input_format_params.max_block_size = max_block_size; - row_input_format_params.preferred_block_size_bytes = settings.preferred_block_size_bytes; - row_input_format_params.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes; + row_input_format_params.preferred_block_size_bytes = settings[Setting::preferred_block_size_bytes]; + row_input_format_params.preferred_max_column_in_block_size_bytes = settings[Setting::preferred_max_column_in_block_size_bytes]; row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num; row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio; row_input_format_params.max_execution_time = settings[Setting::max_execution_time]; diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 8facebdfca7..48f3be97c42 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -88,28 +88,6 @@ void IRowInputFormat::logError() errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data}); } -bool IRowInputFormat::overPreferredBlockSizeLimit(const MutableColumns & columns) const -{ - if (params.preferred_block_size_bytes || params.preferred_max_column_in_block_size_bytes) - { - size_t block_size_bytes = 0; - size_t max_column_in_block_size_bytes = 0; - for (const auto & column : columns) - { - block_size_bytes += column->byteSize(); - max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, column->byteSize()); - - if (params.preferred_block_size_bytes && block_size_bytes >= params.preferred_block_size_bytes) - return true; - - if (params.preferred_max_column_in_block_size_bytes && max_column_in_block_size_bytes >= params.preferred_max_column_in_block_size_bytes) - return true; - } - } - - return false; -} - Chunk IRowInputFormat::read() { if (total_rows == 0) @@ -157,6 +135,28 @@ Chunk IRowInputFormat::read() return getChunkForCount(num_rows); } + auto over_preferred_block_size_limit = [&](const MutableColumns & cols) + { + if (params.preferred_block_size_bytes || params.preferred_max_column_in_block_size_bytes) + { + size_t block_size_bytes = 0; + size_t max_column_in_block_size_bytes = 0; + for (const auto & col : cols) + { + block_size_bytes += col->byteSize(); + max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, col->byteSize()); + + if (params.preferred_block_size_bytes && block_size_bytes >= params.preferred_block_size_bytes) + return true; + + if (params.preferred_max_column_in_block_size_bytes && max_column_in_block_size_bytes >= params.preferred_max_column_in_block_size_bytes) + return true; + } + } + + return false; + }; + RowReadExtension info; bool continue_reading = true; for (size_t rows = 0; (rows < params.max_block_size || num_rows == 0) && continue_reading; ++rows) @@ -191,7 +191,7 @@ Chunk IRowInputFormat::read() if (columns.empty()) ++num_rows; - if (overPreferredBlockSizeLimit(columns)) + if (over_preferred_block_size_limit(columns)) break; } catch (Exception & e) From 767b1e4f4f38520757e186cdf1872198d8a09fea Mon Sep 17 00:00:00 2001 From: sakulali Date: Wed, 30 Oct 2024 01:31:25 +0800 Subject: [PATCH 4/7] Kindly ping CI and try to reproduce failed CI tests From ce84a88cf6e7084f9c0264e97851da95f2cea014 Mon Sep 17 00:00:00 2001 From: sakulali Date: Sat, 16 Nov 2024 21:06:57 +0800 Subject: [PATCH 5/7] fix stateless tests of 01825_type_json_ghdata_insert_select --- src/Processors/Formats/IRowInputFormat.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 47053298bca..91ed023277e 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -147,6 +147,9 @@ Chunk IRowInputFormat::read() size_t max_column_in_block_size_bytes = 0; for (const auto & col : cols) { + if (col->getDataType() == TypeIndex::ObjectDeprecated) + return false; + block_size_bytes += col->byteSize(); max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, col->byteSize()); From 1c76ea056c0b7c61e719a703306bf48fa0c4489c Mon Sep 17 00:00:00 2001 From: sakulali Date: Sun, 17 Nov 2024 00:10:55 +0800 Subject: [PATCH 6/7] Kindly ping CI and try to reproduce failed CI tests From 48f7fc54ad4dda78c9c7270759eaeae7bf99206f Mon Sep 17 00:00:00 2001 From: sakulali Date: Tue, 19 Nov 2024 01:23:33 +0800 Subject: [PATCH 7/7] fix stateless tests of 01034_JSONCompactEachRow --- src/Processors/Formats/IRowInputFormat.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 91ed023277e..626c7b6835f 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -119,6 +119,8 @@ Chunk IRowInputFormat::read() auto chunk_offset = [&]() -> size_t { + if (total_rows == 0) + return getDataOffsetMaybeCompressed(getReadBuffer()); return getDataOffsetMaybeCompressed(getReadBuffer()) + getReadBuffer().offset(); };