diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 7c591af8537..9fba199d0c1 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -150,6 +150,7 @@ InputFormatPtr FormatFactory::getInput( if (!getCreators(name).input_processor_creator) { + // const auto & input_getter = getCreators(name).input_creator; // if (!input_getter) // throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); @@ -194,26 +195,14 @@ InputFormatPtr FormatFactory::getInput( row_input_format_params.max_execution_time = settings.max_execution_time; row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode; + /// Const reference is copied to lambda. + auto parser_creator = [input_getter, sample, row_input_format_params, format_settings] + (ReadBuffer & input) -> InputFormatPtr + { return input_getter(input, sample, row_input_format_params, format_settings); }; - auto parser_creator = std::bind( - input_getter.target(), - std::placeholders::_1, sample, row_input_format_params, format_settings); -// auto anime = parser_creator(buf) - auto boruto = input_getter(buf, sample, row_input_format_params, format_settings); -// auto naruto = input_getter.target()(buf, sample, row_input_format_params, format_settings); - - auto naruto = - [sample, row_input_format_params, format_settings, input_getter] - (ReadBuffer & input) - {return input_getter(input, sample, row_input_format_params, format_settings);}; - - auto aaa = naruto(buf); - - ParallelParsingBlockInputFormat::Params params{buf, sample, - naruto, file_segmentation_engine, - settings.max_threads, - settings.min_chunk_bytes_for_parallel_parsing}; + ParallelParsingBlockInputFormat::Params params{ + buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing}; return std::make_shared(params); } diff --git a/src/Formats/JSONEachRowUtils.cpp b/src/Formats/JSONEachRowUtils.cpp index a1d9b4a5fff..44472a36e16 100644 --- a/src/Formats/JSONEachRowUtils.cpp +++ b/src/Formats/JSONEachRowUtils.cpp @@ -9,6 +9,14 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor skipWhitespaceIfAny(in); char * pos = in.position(); + + /// In case that independent JSONs are splitted by comma we skip that comma. + if (pos && *pos == ',') + { + ++in.position(); + ++pos; + } + size_t balance = 0; bool quotes = false; @@ -61,6 +69,7 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor } saveUpToPosition(in, memory, pos); + assert(*memory.data() == '{'); return loadAtPosition(in, memory, pos); } diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index e7911f9b367..517e07a3ba4 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -464,9 +464,6 @@ void PipelineExecutor::finalizeExecution() if (!all_processors_finished) throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); - - WriteBufferFromOStream out(std::cout); - printPipeline(processors, out); } void PipelineExecutor::wakeUpExecutor(size_t thread_num) diff --git a/src/Processors/Formats/Impl/IReadBufferPrepareAndEndUp.h b/src/Processors/Formats/Impl/IReadBufferPrepareAndEndUp.h new file mode 100644 index 00000000000..33d8bbf9cd6 --- /dev/null +++ b/src/Processors/Formats/Impl/IReadBufferPrepareAndEndUp.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +namespace DB +{ + +class IReadBufferPrepareAndEndUp +{ +public: + virtual ~IReadBufferPrepareAndEndUp() {} + + virtual void prepareReadBuffer(ReadBuffer & buffer) = 0; + + virtual void endUpReadBuffer(ReadBuffer & buffer) = 0; +}; + +using IReadBufferPrepareAndEndUpPtr = std::shared_ptr; + +class JSONEachRowPrepareAndEndUp : public IReadBufferPrepareAndEndUp +{ +public: + void prepareReadBuffer(ReadBuffer & buffer) override + { + /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. + skipBOMIfExists(in); + + skipWhitespaceIfAny(in); + if (!in.eof() && *in.position() == '[') + { + ++in.position(); + data_in_square_brackets = true; + } + } + + void endUpReadBuffer(ReadBuffer & buffer) override + { + skipWhitespaceIfAny(buffer); + if (data_in_square_brackets) + { + assertChar(']', buffer); + skipWhitespaceIfAny(buffer); + } + if (!buffer.eof() && *buffer.position() == ';') + { + ++buffer.position(); + skipWhitespaceIfAny(buffer); + } + assertEOF(buffer); + } + +private: + bool data_in_square_brackets{false}; +}; + +} diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 234839b41f5..dfb5eb79b4b 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -303,31 +303,12 @@ void JSONEachRowRowInputFormat::resetParser() void JSONEachRowRowInputFormat::readPrefix() { - /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. - skipBOMIfExists(in); - - skipWhitespaceIfAny(in); - if (!in.eof() && *in.position() == '[') - { - ++in.position(); - data_in_square_brackets = true; - } + prepare_and_end_up.prepareReadBuffer(in); } void JSONEachRowRowInputFormat::readSuffix() { - skipWhitespaceIfAny(in); - if (data_in_square_brackets) - { - assertChar(']', in); - skipWhitespaceIfAny(in); - } - if (!in.eof() && *in.position() == ';') - { - ++in.position(); - skipWhitespaceIfAny(in); - } - assertEOF(in); + prepare_and_end_up.endUpReadBuffer(in); } diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h index 29a6ce6ecb8..e121d77d7a6 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h @@ -4,7 +4,7 @@ #include #include #include - +#include "IReadBufferPrepareAndEndUp.h" namespace DB { @@ -81,7 +81,11 @@ private: bool allow_new_rows = true; + bool yield_strings; + + /// Used when readSuffix() or readPrefix() are called. + JSONEachRowPrepareAndEndUp prepare_and_end_up; }; } diff --git a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp index 6f63504d227..ad7a160ce11 100644 --- a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -64,6 +65,10 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket * can use it from multiple threads simultaneously. */ ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); + + if (current_ticket_number == 0) + prepareReadBuffer(read_buffer); + InputFormatPtr input_format = internal_parser_creator(read_buffer); InternalParser parser(input_format); @@ -83,6 +88,9 @@ void ParallelParsingBlockInputFormat::parserThreadFunction(size_t current_ticket // except at the end of file. Also see a matching assert in readImpl(). assert(unit.is_last || !unit.chunk_ext.chunk.empty()); + if (unit.is_last) + endUpReadBuffer(read_buffer); + std::lock_guard lock(mutex); unit.status = READY_TO_READ; reader_condvar.notify_all(); @@ -198,5 +206,17 @@ Chunk ParallelParsingBlockInputFormat::generate() return res; } +void ParallelParsingBlockInputFormat::prepareReadBuffer(ReadBuffer & buffer) +{ + if (prepare_and_end_up_ptr) + prepare_and_end_up_ptr->prepareReadBuffer(buffer); +} + + +void ParallelParsingBlockInputFormat::endUpReadBuffer(ReadBuffer & buffer) +{ + if (prepare_and_end_up_ptr) + prepare_and_end_up_ptr->endUpReadBuffer(buffer); +} } diff --git a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h index 3900880ae1b..1a0cd2c537a 100644 --- a/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingBlockInputFormat.h @@ -10,6 +10,7 @@ #include #include #include +#include "IReadBufferPrepareAndEndUp.h" namespace DB { @@ -61,6 +62,7 @@ public: Block header; InternalParserCreator internal_parser_creator; FormatFactory::FileSegmentationEngine file_segmentation_engine; + String format_name; size_t max_threads; size_t min_chunk_bytes; }; @@ -69,6 +71,7 @@ public: : IInputFormat(std::move(params.header), params.in) , internal_parser_creator(params.internal_parser_creator) , file_segmentation_engine(params.file_segmentation_engine) + , format_name(params.format_name) , min_chunk_bytes(params.min_chunk_bytes) // Subtract one thread that we use for segmentation and one for // reading. After that, must have at least two threads left for @@ -80,6 +83,10 @@ public: // bump into reader thread on wraparound. processing_units.resize(params.max_threads + 2); + /// To skip '[' and ']'. + if (format_name == "JSONEachRow") + prepare_and_end_up_ptr = std::make_shared(); + segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); }); } @@ -165,6 +172,7 @@ private: const InternalParserCreator internal_parser_creator; // Function to segment the file. Then "parsers" will parse that segments. FormatFactory::FileSegmentationEngine file_segmentation_engine; + const String format_name; const size_t min_chunk_bytes; BlockMissingValues last_block_missing_values; @@ -178,7 +186,7 @@ private: std::condition_variable reader_condvar; std::condition_variable segmentator_condvar; - std::atomic parsing_finished; + std::atomic parsing_finished{false}; // There are multiple "parsers", that's why we use thread pool. ThreadPool pool; @@ -254,6 +262,12 @@ private: // readImpl() is called from the main thread, so the exception handling // is different. void onBackgroundException(); + + IReadBufferPrepareAndEndUpPtr prepare_and_end_up_ptr; + + void prepareReadBuffer(ReadBuffer & buffer); + + void endUpReadBuffer(ReadBuffer & buffer); }; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 4acfe34d47f..9804abb2247 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -60,7 +60,7 @@ #include #include #include - +#include namespace ProfileEvents { @@ -2904,8 +2904,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context ReadBufferFromMemory right_paren_buf(")", 1); ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf}); - auto format = FormatFactory::instance().getInput("Values", buf, partition_key_sample, context, context.getSettingsRef().max_block_size); - auto input_stream = std::make_shared(format); + auto input_format = FormatFactory::instance().getInput("Values", buf, getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size); + auto input_stream = std::make_shared(input_format); auto block = input_stream->read(); if (!block || !block.rows()) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 477c557bbd0..736089b666f 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -291,7 +291,6 @@ public: Chunk generate() override { - std::cout << StackTrace().toString() << std::endl; while (!finished_generate) { /// Open file lazily on first read. This is needed to avoid too many open files from different streams.