diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 345bec395b2..d27002197d2 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -23,7 +23,6 @@ struct PullingAsyncPipelineExecutor::Data std::atomic_bool is_finished = false; std::atomic_bool has_exception = false; ThreadFromGlobalPool thread; - Poco::Event finish_event; ~Data() { @@ -89,12 +88,10 @@ static void threadFunction( data.has_exception = true; /// Finish lazy format in case of exception. Otherwise thread.join() may hung. - if (data.lazy_format) - data.lazy_format->finalize(); + data.lazy_format->finalize(); } data.is_finished = true; - data.finish_event.set(); } @@ -129,20 +126,8 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds) return false; } - if (lazy_format) - { - chunk = lazy_format->getChunk(milliseconds); - data->rethrowExceptionIfHas(); - return true; - } - - chunk.clear(); - - if (milliseconds) - data->finish_event.tryWait(milliseconds); - else - data->finish_event.wait(); - + chunk = lazy_format->getChunk(milliseconds); + data->rethrowExceptionIfHas(); return true; } @@ -230,14 +215,12 @@ void PullingAsyncPipelineExecutor::cancelWithExceptionHandling(CancelFunc && can Chunk PullingAsyncPipelineExecutor::getTotals() { - return lazy_format ? lazy_format->getTotals() - : Chunk(); + return lazy_format->getTotals(); } Chunk PullingAsyncPipelineExecutor::getExtremes() { - return lazy_format ? lazy_format->getExtremes() - : Chunk(); + return lazy_format->getExtremes(); } Block PullingAsyncPipelineExecutor::getTotalsBlock() @@ -264,15 +247,7 @@ Block PullingAsyncPipelineExecutor::getExtremesBlock() ProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo() { - if (lazy_format) - return lazy_format->getProfileInfo(); - - static ProfileInfo profile_info; - static std::once_flag flag; - /// Calculate rows before limit here to avoid race. - std::call_once(flag, []() { profile_info.getRowsBeforeLimit(); }); - - return profile_info; + return lazy_format->getProfileInfo(); } } diff --git a/src/Processors/Executors/PushingAsyncPipelineExecutor.h b/src/Processors/Executors/PushingAsyncPipelineExecutor.h index 4b4b83a90b5..f976cd4c339 100644 --- a/src/Processors/Executors/PushingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PushingAsyncPipelineExecutor.h @@ -1,6 +1,5 @@ #pragma once #include -#include #include namespace DB