disable parallel parsing for JSON in square brackets

This commit is contained in:
Alexander Tokmakov 2020-02-07 16:16:51 +03:00
parent 5956f7400f
commit 47ad022a1f
6 changed files with 36 additions and 13 deletions

View File

@ -34,7 +34,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
unit.is_last = !have_more_data; unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE; unit.status = READY_TO_PARSE;
scheduleParserThreadForUnitWithNumber(current_unit_number); scheduleParserThreadForUnitWithNumber(segmentator_ticket_number);
++segmentator_ticket_number; ++segmentator_ticket_number;
if (!have_more_data) if (!have_more_data)
@ -49,12 +49,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction()
} }
} }
void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number) void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_ticket_number)
{ {
try try
{ {
setThreadName("ChunkParser"); setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number]; auto & unit = processing_units[current_unit_number];
/* /*
@ -64,9 +65,9 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
* can use it from multiple threads simultaneously. * can use it from multiple threads simultaneously.
*/ */
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>( auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings);
input_processor_creator(read_buffer, header, format->setCurrentUnitNumber(current_ticket_number);
row_input_format_params, format_settings)); auto parser = std::make_unique<InputStreamFromInputFormat>(std::move(format));
unit.block_ext.block.clear(); unit.block_ext.block.clear();
unit.block_ext.block_missing_values.clear(); unit.block_ext.block_missing_values.clear();

View File

@ -213,9 +213,9 @@ private:
std::deque<ProcessingUnit> processing_units; std::deque<ProcessingUnit> processing_units;
void scheduleParserThreadForUnitWithNumber(size_t unit_number) void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
{ {
pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number)); pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, ticket_number));
} }
void finishAndWait() void finishAndWait()

View File

@ -144,9 +144,19 @@ BlockInputStreamPtr FormatFactory::getInput(
// Doesn't make sense to use parallel parsing with less than four threads // Doesn't make sense to use parallel parsing with less than four threads
// (segmentator + two parsers + reader). // (segmentator + two parsers + reader).
if (settings.input_format_parallel_parsing bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;
&& file_segmentation_engine
&& settings.max_threads >= 4) if (parallel_parsing && name == "JSONEachRow")
{
/// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix()
/// For JSONEachRow we can safely skip whitespace characters
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == '[')
parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat)
}
if (parallel_parsing)
{ {
const auto & input_getter = getCreators(name).input_processor_creator; const auto & input_getter = getCreators(name).input_processor_creator;
if (!input_getter) if (!input_getter)

View File

@ -38,6 +38,13 @@ public:
static const BlockMissingValues none; static const BlockMissingValues none;
return none; return none;
} }
size_t getCurrentUnitNumber() const { return current_unit_number; }
void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; }
private:
/// Number of currently parsed chunk (if parallel parsing is enabled)
size_t current_unit_number = 0;
}; };
} }

View File

@ -226,9 +226,11 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
/// then seeking to next ;, or \n would trigger reading of an extra row at the end. /// then seeking to next ;, or \n would trigger reading of an extra row at the end.
/// Semicolon is added for convenience as it could be used at end of INSERT query. /// Semicolon is added for convenience as it could be used at end of INSERT query.
if (getTotalRows() && !in.eof()) bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1;
if (!in.eof())
{ {
if (*in.position() == ',') /// There may be optional ',' (but not before the first row)
if (!is_first_row && *in.position() == ',')
++in.position(); ++in.position();
else if (!data_in_square_brackets && *in.position() == ';') else if (!data_in_square_brackets && *in.position() == ';')
{ {

View File

@ -1,6 +1,9 @@
DROP TABLE IF EXISTS json_square_brackets; DROP TABLE IF EXISTS json_square_brackets;
CREATE TABLE json_square_brackets (id UInt32, name String) ENGINE = Memory; CREATE TABLE json_square_brackets (id UInt32, name String) ENGINE = Memory;
INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}] INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}];
INSERT INTO json_square_brackets FORMAT JSONEachRow[];
INSERT INTO json_square_brackets FORMAT JSONEachRow [ ] ;
INSERT INTO json_square_brackets FORMAT JSONEachRow ;
SELECT * FROM json_square_brackets ORDER BY id; SELECT * FROM json_square_brackets ORDER BY id;
DROP TABLE IF EXISTS json_square_brackets; DROP TABLE IF EXISTS json_square_brackets;