diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp index 3f6ddbd7a15..3669e77080b 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -34,7 +34,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction() unit.is_last = !have_more_data; unit.status = READY_TO_PARSE; - scheduleParserThreadForUnitWithNumber(current_unit_number); + scheduleParserThreadForUnitWithNumber(segmentator_ticket_number); ++segmentator_ticket_number; if (!have_more_data) @@ -49,12 +49,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction() } } -void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number) +void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_ticket_number) { try { setThreadName("ChunkParser"); + const auto current_unit_number = current_ticket_number % processing_units.size(); auto & unit = processing_units[current_unit_number]; /* @@ -64,9 +65,9 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n * can use it from multiple threads simultaneously. */ ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); - auto parser = std::make_unique( - input_processor_creator(read_buffer, header, - row_input_format_params, format_settings)); + auto format = input_processor_creator(read_buffer, header, row_input_format_params, format_settings); + format->setCurrentUnitNumber(current_ticket_number); + auto parser = std::make_unique(std::move(format)); unit.block_ext.block.clear(); unit.block_ext.block_missing_values.clear(); diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h index 89a9d7c8926..1b2bfbd52e2 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h @@ -213,9 +213,9 @@ private: std::deque processing_units; - void scheduleParserThreadForUnitWithNumber(size_t unit_number) + void scheduleParserThreadForUnitWithNumber(size_t ticket_number) { - pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number)); + pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, ticket_number)); } void finishAndWait() diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index b67c659ee89..e6c01d81b81 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -144,9 +144,19 @@ BlockInputStreamPtr FormatFactory::getInput( // Doesn't make sense to use parallel parsing with less than four threads // (segmentator + two parsers + reader). - if (settings.input_format_parallel_parsing - && file_segmentation_engine - && settings.max_threads >= 4) + bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4; + + if (parallel_parsing && name == "JSONEachRow") + { + /// FIXME ParallelParsingBlockInputStream doesn't support formats with non-trivial readPrefix() and readSuffix() + + /// For JSONEachRow we can safely skip whitespace characters + skipWhitespaceIfAny(buf); + if (buf.eof() || *buf.position() == '[') + parallel_parsing = false; /// Disable it for JSONEachRow if data is in square brackets (see JSONEachRowRowInputFormat) + } + + if (parallel_parsing) { const auto & input_getter = getCreators(name).input_processor_creator; if (!input_getter) diff --git a/dbms/src/Processors/Formats/IInputFormat.h b/dbms/src/Processors/Formats/IInputFormat.h index 00cb38405cf..e1537aff6c5 100644 --- a/dbms/src/Processors/Formats/IInputFormat.h +++ b/dbms/src/Processors/Formats/IInputFormat.h @@ -38,6 +38,13 @@ public: static const BlockMissingValues none; return none; } + + size_t getCurrentUnitNumber() const { return current_unit_number; } + void setCurrentUnitNumber(size_t current_unit_number_) { current_unit_number = current_unit_number_; } + +private: + /// Number of currently parsed chunk (if parallel parsing is enabled) + size_t current_unit_number = 0; }; } diff --git a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 8f89d732766..238dbb79818 100644 --- a/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -226,9 +226,11 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi /// then seeking to next ;, or \n would trigger reading of an extra row at the end. /// Semicolon is added for convenience as it could be used at end of INSERT query. - if (getTotalRows() && !in.eof()) + bool is_first_row = getCurrentUnitNumber() == 0 && getTotalRows() == 1; + if (!in.eof()) { - if (*in.position() == ',') + /// There may be optional ',' (but not before the first row) + if (!is_first_row && *in.position() == ',') ++in.position(); else if (!data_in_square_brackets && *in.position() == ';') { diff --git a/dbms/tests/queries/0_stateless/01072_json_each_row_data_in_square_brackets.sql b/dbms/tests/queries/0_stateless/01072_json_each_row_data_in_square_brackets.sql index bd83373c570..0e4e031a985 100644 --- a/dbms/tests/queries/0_stateless/01072_json_each_row_data_in_square_brackets.sql +++ b/dbms/tests/queries/0_stateless/01072_json_each_row_data_in_square_brackets.sql @@ -1,6 +1,9 @@ DROP TABLE IF EXISTS json_square_brackets; CREATE TABLE json_square_brackets (id UInt32, name String) ENGINE = Memory; -INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}] +INSERT INTO json_square_brackets FORMAT JSONEachRow [{"id": 1, "name": "name1"}, {"id": 2, "name": "name2"}]; +INSERT INTO json_square_brackets FORMAT JSONEachRow[]; +INSERT INTO json_square_brackets FORMAT JSONEachRow [ ] ; +INSERT INTO json_square_brackets FORMAT JSONEachRow ; SELECT * FROM json_square_brackets ORDER BY id; DROP TABLE IF EXISTS json_square_brackets;