This commit is contained in:
Nikita Mikhaylov 2019-11-01 21:59:54 +03:00
parent 0e04d14b7d
commit 3c57b8e9c2
4 changed files with 19 additions and 7 deletions

View File

@ -114,7 +114,7 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
Block ParallelParsingBlockInputStream::readImpl() Block ParallelParsingBlockInputStream::readImpl()
{ {
Block res; Block res;
if (isCancelledOrThrowIfKilled()) if (isCancelledOrThrowIfKilled() || executed)
return res; return res;
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
@ -137,9 +137,8 @@ Block ParallelParsingBlockInputStream::readImpl()
{ {
if (is_last[current_number]) if (is_last[current_number])
{ {
LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readImpl()"), "Last unit. Will cancel the query."); //In case that all data was read we don't need to cancel.
lock.unlock(); executed= true;
cancel(false);
return res; return res;
} }

View File

@ -88,7 +88,6 @@ public:
~ParallelParsingBlockInputStream() override ~ParallelParsingBlockInputStream() override
{ {
waitForAllThreads(); waitForAllThreads();
LOG_TRACE(&Poco::Logger::get("~ParallelParsingBLockInputStream()"), "All threads are killed.");
} }
void cancel(bool kill) override void cancel(bool kill) override
@ -114,7 +113,12 @@ public:
} }
protected: protected:
void readPrefix() override {} //void readPrefix() override {}
void readSuffix() override {
readers[segmentator_ticket_number % max_threads_to_use]->readPrefix();
LOG_TRACE(&Poco::Logger::get("ParallelParsingBLockInputStream::readSuffix()"), "ReadSuffix");
}
//Reader routine //Reader routine
Block readImpl() override; Block readImpl() override;
@ -135,6 +139,7 @@ private:
const size_t min_chunk_size; const size_t min_chunk_size;
std::atomic<bool> is_exception_occured{false}; std::atomic<bool> is_exception_occured{false};
std::atomic<bool> executed{false};
BlockMissingValues last_block_missing_values; BlockMissingValues last_block_missing_values;

View File

@ -129,6 +129,8 @@ BlockInputStreamPtr FormatFactory::getInput(
const size_t max_threads_for_parallel_parsing = settings.max_threads_for_parallel_parsing; const size_t max_threads_for_parallel_parsing = settings.max_threads_for_parallel_parsing;
const size_t max_threads_to_use = max_threads_for_parallel_parsing == 0 ? global_max_threads : std::min(max_threads_for_parallel_parsing, global_max_threads); const size_t max_threads_to_use = max_threads_for_parallel_parsing == 0 ? global_max_threads : std::min(max_threads_for_parallel_parsing, global_max_threads);
LOG_TRACE(&Poco::Logger::get("FormatFactory::getInput()"), "Will use " << max_threads_to_use << " threads for parallel parsing.");
auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings}; auto params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_size_for_parallel_parsing}; ParallelParsingBlockInputStream::Builder builder{buf, input_getter, params, file_segmentation_engine, max_threads_to_use, settings.min_chunk_size_for_parallel_parsing};
return std::make_shared<ParallelParsingBlockInputStream>(builder); return std::make_shared<ParallelParsingBlockInputStream>(builder);

View File

@ -8,7 +8,6 @@
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Common/UTF8Helpers.h> #include <Common/UTF8Helpers.h>
namespace DB namespace DB
{ {
@ -201,8 +200,15 @@ void PrettyBlockOutputFormat::writeValueWithPadding(
{ {
auto writePadding = [&]() auto writePadding = [&]()
{ {
// if (pad_to_width < value_width)
// return;
if (pad_to_width < value_width) if (pad_to_width < value_width)
{
std::cout << "pad_to_width and value_width " << pad_to_width << " " << value_width << std::endl;
std::cout << StackTrace().toString() << std::endl;
return; return;
}
for (size_t k = 0; k < pad_to_width - value_width; ++k) for (size_t k = 0; k < pad_to_width - value_width; ++k)
writeChar(' ', out); writeChar(' ', out);
}; };