PullingAsyncPipelineExecutor cleanup

lazy_format is used always
This commit is contained in:
Igor Nikonov 2024-02-28 20:50:56 +00:00
parent 4023e67ac5
commit 7077499064
2 changed files with 6 additions and 32 deletions

View File

@ -23,7 +23,6 @@ struct PullingAsyncPipelineExecutor::Data
std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
Poco::Event finish_event;
~Data()
{
@ -89,12 +88,10 @@ static void threadFunction(
data.has_exception = true;
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
if (data.lazy_format)
data.lazy_format->finalize();
data.lazy_format->finalize();
}
data.is_finished = true;
data.finish_event.set();
}
@ -129,20 +126,8 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
return false;
}
if (lazy_format)
{
chunk = lazy_format->getChunk(milliseconds);
data->rethrowExceptionIfHas();
return true;
}
chunk.clear();
if (milliseconds)
data->finish_event.tryWait(milliseconds);
else
data->finish_event.wait();
chunk = lazy_format->getChunk(milliseconds);
data->rethrowExceptionIfHas();
return true;
}
@ -230,14 +215,12 @@ void PullingAsyncPipelineExecutor::cancelWithExceptionHandling(CancelFunc && can
Chunk PullingAsyncPipelineExecutor::getTotals()
{
return lazy_format ? lazy_format->getTotals()
: Chunk();
return lazy_format->getTotals();
}
Chunk PullingAsyncPipelineExecutor::getExtremes()
{
return lazy_format ? lazy_format->getExtremes()
: Chunk();
return lazy_format->getExtremes();
}
Block PullingAsyncPipelineExecutor::getTotalsBlock()
@ -264,15 +247,7 @@ Block PullingAsyncPipelineExecutor::getExtremesBlock()
ProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
{
if (lazy_format)
return lazy_format->getProfileInfo();
static ProfileInfo profile_info;
static std::once_flag flag;
/// Calculate rows before limit here to avoid race.
std::call_once(flag, []() { profile_info.getRowsBeforeLimit(); });
return profile_info;
return lazy_format->getProfileInfo();
}
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <memory>
#include <atomic>
#include <vector>
namespace DB