This commit is contained in:
sakulali 2024-11-20 17:58:20 -05:00 committed by GitHub
commit af3e77d3bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 45 additions and 4 deletions

View File

@ -252,7 +252,7 @@ This format is also available under the names `TSVRawWithNames`, `RawWithNames`.
Differs from `TabSeparatedWithNamesAndTypes` format in that the rows are written without escaping. Differs from `TabSeparatedWithNamesAndTypes` format in that the rows are written without escaping.
When parsing with this format, tabs or linefeeds are not allowed in each field. When parsing with this format, tabs or linefeeds are not allowed in each field.
This format is also available under the names `TSVRawWithNamesAndNames`, `RawWithNamesAndNames`. This format is also available under the names `TSVRawWithNamesAndTypes`, `RawWithNamesAndTypes`.
## Template {#format-template} ## Template {#format-template}

View File

@ -50,6 +50,8 @@ FORMAT_FACTORY_SETTINGS(DECLARE_FORMAT_EXTERN, SKIP_ALIAS)
extern const SettingsBool output_format_parallel_formatting; extern const SettingsBool output_format_parallel_formatting;
extern const SettingsOverflowMode timeout_overflow_mode; extern const SettingsOverflowMode timeout_overflow_mode;
extern const SettingsInt64 zstd_window_log_max; extern const SettingsInt64 zstd_window_log_max;
extern const SettingsUInt64 preferred_block_size_bytes;
extern const SettingsUInt64 preferred_max_column_in_block_size_bytes;
} }
namespace ErrorCodes namespace ErrorCodes
@ -361,6 +363,8 @@ InputFormatPtr FormatFactory::getInput(
RowInputFormatParams row_input_format_params; RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size; row_input_format_params.max_block_size = max_block_size;
row_input_format_params.preferred_block_size_bytes = settings[Setting::preferred_block_size_bytes];
row_input_format_params.preferred_max_column_in_block_size_bytes = settings[Setting::preferred_max_column_in_block_size_bytes];
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num; row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio; row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings[Setting::max_execution_time]; row_input_format_params.max_execution_time = settings[Setting::max_execution_time];

View File

@ -117,8 +117,15 @@ Chunk IRowInputFormat::read()
block_missing_values.clear(); block_missing_values.clear();
auto chunk_offset = [&]() -> size_t
{
if (total_rows == 0)
return getDataOffsetMaybeCompressed(getReadBuffer());
return getDataOffsetMaybeCompressed(getReadBuffer()) + getReadBuffer().offset();
};
size_t num_rows = 0; size_t num_rows = 0;
size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer()); size_t chunk_start_offset = chunk_offset();
try try
{ {
if (need_only_count && supportsCountRows()) if (need_only_count && supportsCountRows())
@ -130,10 +137,35 @@ Chunk IRowInputFormat::read()
return {}; return {};
} }
total_rows += num_rows; total_rows += num_rows;
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset; approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset;
return getChunkForCount(num_rows); return getChunkForCount(num_rows);
} }
auto over_preferred_block_size_limit = [&](const MutableColumns & cols)
{
if (params.preferred_block_size_bytes || params.preferred_max_column_in_block_size_bytes)
{
size_t block_size_bytes = 0;
size_t max_column_in_block_size_bytes = 0;
for (const auto & col : cols)
{
if (col->getDataType() == TypeIndex::ObjectDeprecated)
return false;
block_size_bytes += col->byteSize();
max_column_in_block_size_bytes = std::max(max_column_in_block_size_bytes, col->byteSize());
if (params.preferred_block_size_bytes && block_size_bytes >= params.preferred_block_size_bytes)
return true;
if (params.preferred_max_column_in_block_size_bytes && max_column_in_block_size_bytes >= params.preferred_max_column_in_block_size_bytes)
return true;
}
}
return false;
};
RowReadExtension info; RowReadExtension info;
bool continue_reading = true; bool continue_reading = true;
for (size_t rows = 0; (rows < params.max_block_size || num_rows == 0) && continue_reading; ++rows) for (size_t rows = 0; (rows < params.max_block_size || num_rows == 0) && continue_reading; ++rows)
@ -170,6 +202,9 @@ Chunk IRowInputFormat::read()
/// The case when there is no columns. Just count rows. /// The case when there is no columns. Just count rows.
if (columns.empty()) if (columns.empty())
++num_rows; ++num_rows;
if (over_preferred_block_size_limit(columns))
break;
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -252,7 +287,7 @@ Chunk IRowInputFormat::read()
column->finalize(); column->finalize();
Chunk chunk(std::move(columns), num_rows); Chunk chunk(std::move(columns), num_rows);
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset; approx_bytes_read_for_chunk = chunk_offset() - chunk_start_offset;
return chunk; return chunk;
} }

View File

@ -24,6 +24,8 @@ struct RowReadExtension
struct RowInputFormatParams struct RowInputFormatParams
{ {
size_t max_block_size = 0; size_t max_block_size = 0;
UInt64 preferred_block_size_bytes = 0;
UInt64 preferred_max_column_in_block_size_bytes = 0;
UInt64 allow_errors_num = 0; UInt64 allow_errors_num = 0;
Float64 allow_errors_ratio = 0; Float64 allow_errors_ratio = 0;