Fix parallel parsing: segmentator now checks max_block_size.

This commit is contained in:
Vitaly Baranov 2022-09-27 22:14:15 +02:00
parent c177061f75
commit f65d3ff95a
13 changed files with 94 additions and 57 deletions

View File

@ -247,8 +247,8 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
context->getApplicationType() == Context::ApplicationType::SERVER};
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads,
settings.min_chunk_bytes_for_parallel_parsing, max_block_size, context->getApplicationType() == Context::ApplicationType::SERVER};
auto format = std::make_shared<ParallelParsingInputFormat>(params);
if (!settings.input_format_record_errors_file_path.toString().empty())
{

View File

@ -61,13 +61,16 @@ public:
using ReadCallback = std::function<void()>;
/** Fast reading data from buffer and save result to memory.
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
* Reads at least `min_bytes` and some more until the end of the chunk, depends on the format.
* If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows
* (even if the `min_bytes` boundary isn't reached yet).
* Used in ParallelParsingInputFormat.
*/
using FileSegmentationEngine = std::function<std::pair<bool, size_t>(
ReadBuffer & buf,
DB::Memory<Allocator<false>> & memory,
size_t min_chunk_bytes)>;
size_t min_bytes,
size_t max_rows)>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.

View File

@ -33,7 +33,7 @@ namespace JSONUtils
template <const char opening_bracket, const char closing_bracket>
static std::pair<bool, size_t>
fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
{
skipWhitespaceIfAny(in);
@ -41,14 +41,17 @@ namespace JSONUtils
size_t balance = 0;
bool quotes = false;
size_t number_of_rows = 0;
bool need_more_data = true;
while (loadAtPosition(in, memory, pos)
&& (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size || number_of_rows < min_rows))
if (max_rows && (max_rows < min_rows))
max_rows = min_rows;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
const auto current_object_size = memory.size() + static_cast<size_t>(pos - in.position());
if (min_chunk_size != 0 && current_object_size > 10 * min_chunk_size)
if (min_bytes != 0 && current_object_size > 10 * min_bytes)
throw ParsingException(
"Size of JSON object is extremely large. Expected not greater than " + std::to_string(min_chunk_size)
"Size of JSON object is extremely large. Expected not greater than " + std::to_string(min_bytes)
+ " bytes, but current is " + std::to_string(current_object_size)
+ " bytes per row. Increase the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, most likely JSON is malformed",
ErrorCodes::INCORRECT_DATA);
@ -106,7 +109,12 @@ namespace JSONUtils
}
if (balance == 0)
{
++number_of_rows;
if ((number_of_rows >= min_rows)
&& ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows)))
need_more_data = false;
}
}
}
@ -118,7 +126,7 @@ namespace JSONUtils
static String readJSONEachRowLineIntoStringImpl(ReadBuffer & in)
{
Memory memory;
fileSegmentationEngineJSONEachRowImpl<opening_bracket, closing_bracket>(in, memory, 0, 1);
fileSegmentationEngineJSONEachRowImpl<opening_bracket, closing_bracket>(in, memory, 0, 1, 1);
return String(memory.data(), memory.size());
}
@ -297,15 +305,15 @@ namespace JSONUtils
return data_types;
}
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineJSONEachRowImpl<'{', '}'>(in, memory, min_chunk_size, 1);
return fileSegmentationEngineJSONEachRowImpl<'{', '}'>(in, memory, min_bytes, 1, max_rows);
}
std::pair<bool, size_t>
fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
{
return fileSegmentationEngineJSONEachRowImpl<'[', ']'>(in, memory, min_chunk_size, min_rows);
return fileSegmentationEngineJSONEachRowImpl<'[', ']'>(in, memory, min_bytes, min_rows, max_rows);
}
struct JSONEachRowFieldsExtractor

View File

@ -15,9 +15,8 @@ namespace DB
namespace JSONUtils
{
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
std::pair<bool, size_t>
fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows);
std::pair<bool, size_t> fileSegmentationEngineJSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows);
std::pair<bool, size_t> fileSegmentationEngineJSONCompactEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows);
/// Parse JSON from string and convert it's type to ClickHouse type. Make the result type always Nullable.
/// JSON array with different nested types is treated as Tuple.

View File

@ -314,13 +314,16 @@ void registerInputFormatCSV(FormatFactory & factory)
registerWithNamesAndTypes("CSV", register_func);
}
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows)
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
{
char * pos = in.position();
bool quotes = false;
bool need_more_data = true;
size_t number_of_rows = 0;
if (max_rows && (max_rows < min_rows))
max_rows = min_rows;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
if (quotes)
@ -346,30 +349,30 @@ std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memor
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
else if (*pos == '"')
if (*pos == '"')
{
quotes = true;
++pos;
continue;
}
else if (*pos == '\n')
++number_of_rows;
if ((number_of_rows >= min_rows)
&& ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows)))
need_more_data = false;
if (*pos == '\n')
{
++number_of_rows;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size && number_of_rows >= min_rows)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size && number_of_rows >= min_rows)
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
{
++pos;
++number_of_rows;
}
}
}
}
@ -383,9 +386,9 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory)
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
{
size_t min_rows = 1 + int(with_names) + int(with_types);
factory.registerFileSegmentationEngine(format_name, [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
factory.registerFileSegmentationEngine(format_name, [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_chunk_size, min_rows);
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows);
});
};

View File

@ -83,6 +83,6 @@ private:
CSVFormatReader reader;
};
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, size_t min_rows);
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows);
}

View File

@ -66,8 +66,8 @@ void registerFileSegmentationEngineHiveText(FormatFactory & factory)
{
factory.registerFileSegmentationEngine(
"HiveText",
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_chunk_size, 0);
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows);
});
}

View File

@ -265,9 +265,9 @@ void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory)
/// the minimum of rows for segmentation engine according to
/// parameters with_names and with_types.
size_t min_rows = 1 + int(with_names) + int(with_types);
factory.registerFileSegmentationEngine(format_name, [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
factory.registerFileSegmentationEngine(format_name, [min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return JSONUtils::fileSegmentationEngineJSONCompactEachRow(in, memory, min_chunk_size, min_rows);
return JSONUtils::fileSegmentationEngineJSONCompactEachRow(in, memory, min_bytes, min_rows, max_rows);
});
};

View File

@ -33,7 +33,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
// Segmentating the original input.
unit.segment.resize(0);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes, max_block_size);
unit.offset = successfully_read_rows_count;
successfully_read_rows_count += currently_read_rows;

View File

@ -82,6 +82,7 @@ public:
String format_name;
size_t max_threads;
size_t min_chunk_bytes;
size_t max_block_size;
bool is_server;
};
@ -91,6 +92,7 @@ public:
, file_segmentation_engine(params.file_segmentation_engine)
, format_name(params.format_name)
, min_chunk_bytes(params.min_chunk_bytes)
, max_block_size(params.max_block_size)
, is_server(params.is_server)
, pool(params.max_threads)
{
@ -188,6 +190,7 @@ private:
FormatFactory::FileSegmentationEngine file_segmentation_engine;
const String format_name;
const size_t min_chunk_bytes;
const size_t max_block_size;
BlockMissingValues last_block_missing_values;

View File

@ -174,7 +174,7 @@ void registerInputFormatRegexp(FormatFactory & factory)
});
}
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
char * pos = in.position();
bool need_more_data = true;
@ -182,17 +182,28 @@ static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in,
while (loadAtPosition(in, memory, pos) && need_more_data)
{
pos = find_first_symbols<'\n'>(pos, in.buffer().end());
pos = find_first_symbols<'\r', '\n'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
++number_of_rows;
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows))
need_more_data = false;
++pos;
++number_of_rows;
if (*pos == '\n')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
saveUpToPosition(in, memory, pos);

View File

@ -320,12 +320,15 @@ void registerTSVSchemaReader(FormatFactory & factory)
}
}
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size, bool is_raw, size_t min_rows)
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, bool is_raw, size_t min_bytes, size_t min_rows, size_t max_rows)
{
bool need_more_data = true;
char * pos = in.position();
size_t number_of_rows = 0;
if (max_rows && (max_rows < min_rows))
max_rows = min_rows;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
if (is_raw)
@ -335,8 +338,7 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
if (pos == in.buffer().end())
else if (pos == in.buffer().end())
continue;
if (!is_raw && *pos == '\\')
@ -344,15 +346,25 @@ static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer
++pos;
if (loadAtPosition(in, memory, pos))
++pos;
continue;
}
else if (*pos == '\n' || *pos == '\r')
{
if (*pos == '\n')
++number_of_rows;
if ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size) && number_of_rows >= min_rows)
need_more_data = false;
++number_of_rows;
if ((number_of_rows >= min_rows)
&& ((memory.size() + static_cast<size_t>(pos - in.position()) >= min_bytes) || (number_of_rows == max_rows)))
need_more_data = false;
if (*pos == '\n')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\r')
++pos;
}
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
}
}
@ -368,9 +380,9 @@ void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
auto register_func = [&](const String & format_name, bool with_names, bool with_types)
{
size_t min_rows = 1 + static_cast<int>(with_names) + static_cast<int>(with_types);
factory.registerFileSegmentationEngine(format_name, [is_raw, min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
factory.registerFileSegmentationEngine(format_name, [is_raw, min_rows](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineTabSeparatedImpl(in, memory, min_chunk_size, is_raw, min_rows);
return fileSegmentationEngineTabSeparatedImpl(in, memory, is_raw, min_bytes, min_rows, max_rows);
});
};
@ -381,12 +393,9 @@ void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
}
// We can use the same segmentation engine for TSKV.
factory.registerFileSegmentationEngine("TSKV", [](
ReadBuffer & in,
DB::Memory<> & memory,
size_t min_chunk_size)
factory.registerFileSegmentationEngine("TSKV", [](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineTabSeparatedImpl(in, memory, min_chunk_size, false, 1);
return fileSegmentationEngineTabSeparatedImpl(in, memory, false, min_bytes, 1, max_rows);
});
}

View File

@ -5,6 +5,7 @@
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
namespace DB
{