Calculate the number of parsing theads correctly

This commit is contained in:
Alexander Kuzmenkov 2019-11-19 16:11:04 +03:00
parent f8f6de836a
commit 0d1933cb44
2 changed files with 33 additions and 19 deletions

View File

@ -67,30 +67,37 @@ public:
const FormatSettings &settings;
};
struct Builder
struct Params
{
ReadBuffer & read_buffer;
const InputProcessorCreator &input_processor_creator;
const InputCreatorParams &input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
size_t max_threads_to_use;
int max_threads;
size_t min_chunk_bytes;
};
explicit ParallelParsingBlockInputStream(const Builder & builder)
: header(builder.input_creator_params.sample),
context(builder.input_creator_params.context),
row_input_format_params(builder.input_creator_params.row_input_format_params),
format_settings(builder.input_creator_params.settings),
input_processor_creator(builder.input_processor_creator),
min_chunk_bytes(builder.min_chunk_bytes),
original_buffer(builder.read_buffer),
pool(builder.max_threads_to_use),
file_segmentation_engine(builder.file_segmentation_engine)
explicit ParallelParsingBlockInputStream(const Params & params)
: header(params.input_creator_params.sample),
context(params.input_creator_params.context),
row_input_format_params(params.input_creator_params.row_input_format_params),
format_settings(params.input_creator_params.settings),
input_processor_creator(params.input_processor_creator),
min_chunk_bytes(params.min_chunk_bytes),
original_buffer(params.read_buffer),
// Subtract one thread that we use for segmentation and one for
// reading. After that, must have at least two threads left for
// parsing. See the assertion below.
pool(std::max(2, params.max_threads - 2)),
file_segmentation_engine(params.file_segmentation_engine)
{
// Allocate more units than threads to decrease segmentator
// waiting on reader on wraparound. The number is random.
processing_units.resize(builder.max_threads_to_use + 4);
// See comment above.
assert(params.max_threads >= 4);
// One unit for each thread, including segmentator and reader, plus a
// couple more units so that the segmentation thread doesn't spuriously
// bump into reader thread on wraparound.
processing_units.resize(params.max_threads + 2);
segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
}

View File

@ -108,7 +108,11 @@ BlockInputStreamPtr FormatFactory::getInput(
const Settings & settings = context.getSettingsRef();
const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine;
if (settings.input_format_parallel_parsing && file_segmentation_engine)
// Doesn't make sense to use parallel parsing with less than four threads
// (segmentator + two parsers + reader).
if (settings.input_format_parallel_parsing
&& file_segmentation_engine
&& settings.max_threads >= 4)
{
const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter)
@ -124,9 +128,12 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(builder);
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
settings.min_chunk_bytes_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(params);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));