chunk size -> chunk bytes

This commit is contained in:
Alexander Kuzmenkov 2019-11-18 22:25:17 +03:00
parent 17b4565c48
commit 168e15b2ae
8 changed files with 19 additions and 19 deletions

View File

@ -108,9 +108,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
\
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for several data formats (JSONEachRow, TSV, TKSV, CSV).") \
M(SettingUInt64, max_threads_for_parallel_parsing, 10, "The maximum number of threads to parallel parsing.") \
M(SettingUInt64, min_chunk_size_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.") \
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.") \
M(SettingUInt64, max_threads_for_parallel_parsing, 0, "The maximum number of threads to parallel parsing.") \
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.") \
\
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.") \

View File

@ -31,7 +31,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
unit.segment.resize(0);
const bool have_more_data = file_segmentation_engine(original_buffer,
unit.segment, min_chunk_size);
unit.segment, min_chunk_bytes);
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;

View File

@ -17,7 +17,7 @@ namespace DB
* ORDER-PRESERVING parallel parsing of data formats.
* It splits original data into chunks. Then each chunk is parsed by different thread.
* The number of chunks equals to max_threads_for_parallel_parsing setting.
* The size of chunk is equal to min_chunk_size_for_parallel_parsing setting.
* The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
*
* This stream has three kinds of threads: one segmentator, multiple parsers
* (max_threads_for_parallel_parsing) and one reader thread -- that is, the one
@ -75,7 +75,7 @@ public:
const InputCreatorParams &input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
size_t max_threads_to_use;
size_t min_chunk_size;
size_t min_chunk_bytes;
};
explicit ParallelParsingBlockInputStream(const Builder & builder)
@ -84,7 +84,7 @@ public:
row_input_format_params(builder.input_creator_params.row_input_format_params),
format_settings(builder.input_creator_params.settings),
input_processor_creator(builder.input_processor_creator),
min_chunk_size(builder.min_chunk_size),
min_chunk_bytes(builder.min_chunk_bytes),
original_buffer(builder.read_buffer),
pool(builder.max_threads_to_use),
file_segmentation_engine(builder.file_segmentation_engine)
@ -148,7 +148,7 @@ private:
const FormatSettings format_settings;
const InputProcessorCreator input_processor_creator;
const size_t min_chunk_size;
const size_t min_chunk_bytes;
/*
* This is declared as atomic to avoid UB, because parser threads access it

View File

@ -132,7 +132,7 @@ BlockInputStreamPtr FormatFactory::getInput(
//LOG_TRACE(&Poco::Logger::get("FormatFactory::getInput()"), "Will use " << max_threads_to_use << " threads for parallel parsing.");
auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_size_for_parallel_parsing};
ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(builder);
}

View File

@ -43,13 +43,13 @@ public:
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least min_chunk_size bytes and some more until the end of the chunk, depends on the format.
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
* Used in ParallelParsingBlockInputStream.
*/
using FileSegmentationEngine = std::function<bool(
ReadBuffer & buf,
DB::Memory<> & memory,
size_t min_chunk_size)>;
size_t min_chunk_bytes)>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.

View File

@ -320,7 +320,7 @@ bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx
}
/// Can be used in fileSegmentationEngine for parallel parsing of Values
bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_size, int balance)
bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_bytes, int balance)
{
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == ';')
@ -328,7 +328,7 @@ bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_size, int balance)
bool quoted = false;
size_t chunk_begin_buf_count = buf.count();
while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_size))
while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_bytes))
{
buf.position() = find_first_symbols<'\\', '\'', ')', '('>(buf.position(), buf.buffer().end());
if (buf.position() == buf.buffer().end())

View File

@ -60,7 +60,7 @@ private:
void readSuffix();
bool skipToNextRow(size_t min_chunk_size = 0, int balance = 0);
bool skipToNextRow(size_t min_chunk_bytes = 0, int balance = 0);
private:
PeekableReadBuffer buf;

View File

@ -1004,15 +1004,15 @@ Enable order-preserving parallel parsing of data formats such as JSONEachRow, TS
## max_threads_for_parallel_parsing
- Type: unsigned int
- Default value: 10
- Default value: 0
The maximum number of threads for order-preserving parallel parsing of data formats.
The maximum number of threads for order-preserving parallel parsing of data formats. 0 means use global maximum.
## min_chunk_size_for_parallel_parsing
## min_chunk_bytes_for_parallel_parsing
- Type: unsigned int
- Default value: 1024 * 1024
- Default value: 1 MiB
The minimum chunk size in bytes, which each thread will parse in parallel. By default it equals to one megabyte.
The minimum chunk size in bytes, which each thread will parse in parallel.
[Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!-- hide -->