diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp index e16c72c7c67..02b0c9d47fa 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -114,7 +114,7 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n Block ParallelParsingBlockInputStream::readImpl() { Block res; - if (isCancelledOrThrowIfKilled()) + if (isCancelledOrThrowIfKilled() || executed) return res; std::unique_lock lock(mutex); @@ -137,9 +137,8 @@ Block ParallelParsingBlockInputStream::readImpl() { if (is_last[current_number]) { - LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Last unit. Will cancel the query."); - lock.unlock(); - cancel(false); + //In case that all data was read we don't need to cancel. + executed= true; return res; } diff --git a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h index c3d9ee41be9..1021f44fcc9 100644 --- a/dbms/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelParsingBlockInputStream.h @@ -88,7 +88,6 @@ public: ~ParallelParsingBlockInputStream() override { waitForAllThreads(); - LOG_TRACE(&Poco::Logger::get("~ParallelParsingBLockInputStream()"), "All threads are killed."); } void cancel(bool kill) override @@ -114,7 +113,12 @@ public: } protected: - void readPrefix() override {} + //void readPrefix() override {} + + void readSuffix() override { + readers[segmentator_ticket_number % max_threads_to_use]->readPrefix(); + LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readSuffix()"), "ReadSuffix"); + } //Reader routine Block readImpl() override; @@ -135,6 +139,7 @@ private: const size_t min_chunk_size; std::atomic is_exception_occured{false}; + std::atomic executed{false}; BlockMissingValues last_block_missing_values; diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index b883e42e8ad..9fa22ab8248 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -129,6 +129,8 @@ BlockInputStreamPtr FormatFactory::getInput( const size_t max_threads_for_parallel_parsing = settings.max_threads_for_parallel_parsing; const size_t max_threads_to_use = max_threads_for_parallel_parsing == 0 ? global_max_threads : std::min(max_threads_for_parallel_parsing, global_max_threads); + LOG_TRACE(&Poco::Logger::get("FormatFactory::getInput()"), "Will use " << max_threads_to_use << " threads for parallel parsing."); + auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings}; ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_size_for_parallel_parsing}; return std::make_shared(builder); diff --git a/dbms/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp index 55e65cf65db..35b39fc8a69 100644 --- a/dbms/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/PrettyBlockOutputFormat.cpp @@ -8,7 +8,6 @@ #include #include - namespace DB { @@ -201,8 +200,15 @@ void PrettyBlockOutputFormat::writeValueWithPadding( { auto writePadding = [&]() { +// if (pad_to_width < value_width) +// return; if (pad_to_width < value_width) + { + std::cout << "pad_to_width and value_width " << pad_to_width << " " << value_width << std::endl; + std::cout << StackTrace().toString() << std::endl; return; + } + for (size_t k = 0; k < pad_to_width - value_width; ++k) writeChar(' ', out); };