This commit is contained in:
sakulali 2024-08-27 17:28:53 -07:00 committed by GitHub
commit d96931bf02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 5 deletions

View File

@ -252,7 +252,7 @@ This format is also available under the names `TSVRawWithNames`, `RawWithNames`.
Differs from `TabSeparatedWithNamesAndTypes` format in that the rows are written without escaping.
When parsing with this format, tabs or linefeeds are not allowed in each field.
This format is also available under the names `TSVRawWithNamesAndNames`, `RawWithNamesAndNames`.
This format is also available under the names `TSVRawWithNamesAndTypes`, `RawWithNamesAndTypes`.
## Template {#format-template}

View File

@ -753,7 +753,13 @@ Default value: `65,409`
Used for the same purpose as `max_block_size`, but it sets the recommended block size in bytes by adapting it to the number of rows in the block.
However, the block size cannot be more than `max_block_size` rows.
By default: 1,000,000. It only works when reading from MergeTree engines.
By default: 1,000,000. It works when reading from MergeTree engines and reading contents from row input formats.
## preferred_max_column_in_block_size_bytes {#preferred-max-column-in-block-size-bytes}
Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.
However, the column size in block cannot be more than `max_block_size` rows.
By default: 0 - Disabled. It works when reading from MergeTree engines and reading contents from row input formats.
## max_concurrent_queries_for_user {#max-concurrent-queries-for-user}

View File

@ -331,6 +331,8 @@ InputFormatPtr FormatFactory::getInput(
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.preferred_block_size_bytes = settings.preferred_block_size_bytes;
row_input_format_params.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings.max_execution_time;

View File

@ -85,6 +85,28 @@ void IRowInputFormat::logError()
errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data});
}
bool IRowInputFormat::overPreferredBlockSizeLimit(const MutableColumns & columns) const
{
if (params.preferred_block_size_bytes || params.preferred_max_column_in_block_size_bytes)
{
size_t block_size_bytes = 0;
size_t max_column_in_block_size_bytes = 0;
for (const auto & column : columns)
{
block_size_bytes += column->byteSize();
max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, column->byteSize());
if (params.preferred_block_size_bytes && block_size_bytes >= params.preferred_block_size_bytes)
return true;
if (params.preferred_max_column_in_block_size_bytes && max_column_in_block_size_bytes >= params.preferred_max_column_in_block_size_bytes)
return true;
}
}
return false;
}
Chunk IRowInputFormat::read()
{
if (total_rows == 0)
@ -107,8 +129,13 @@ Chunk IRowInputFormat::read()
block_missing_values.clear();
auto chunk_offset = [&]() -> size_t
{
return getDataOffsetMaybeCompressed(getReadBuffer()) + getReadBuffer().offset();
};
size_t num_rows = 0;
size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer());
size_t chunk_start_offset = chunk_offset();
try
{
if (need_only_count && supportsCountRows())
@ -120,7 +147,7 @@ Chunk IRowInputFormat::read()
return {};
}
total_rows += num_rows;
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset;
approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset;
return getChunkForCount(num_rows);
}
@ -157,6 +184,9 @@ Chunk IRowInputFormat::read()
/// The case when there is no columns. Just count rows.
if (columns.empty())
++num_rows;
if (overPreferredBlockSizeLimit(columns))
break;
}
catch (Exception & e)
{
@ -244,7 +274,7 @@ Chunk IRowInputFormat::read()
column->finalize();
Chunk chunk(std::move(columns), num_rows);
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset;
approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset;
return chunk;
}

View File

@ -23,6 +23,8 @@ struct RowReadExtension
struct RowInputFormatParams
{
size_t max_block_size = 0;
UInt64 preferred_block_size_bytes = 0;
UInt64 preferred_max_column_in_block_size_bytes = 0;
UInt64 allow_errors_num = 0;
Float64 allow_errors_ratio = 0;
@ -77,6 +79,8 @@ protected:
void logError();
bool overPreferredBlockSizeLimit(const MutableColumns & columns) const;
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
size_t getRowNum() const { return total_rows; }