Rewrite PushingToViewsBlockOutputStream part 2.

This commit is contained in:
Nikolai Kochetov 2021-08-26 11:01:26 +03:00
parent d7e78f3ea9
commit 4d821efa15
3 changed files with 130 additions and 50 deletions

View File

@ -226,6 +226,7 @@ public:
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
void logToQueryViewsLog(const ViewRuntimeData & vinfo);
protected:
void applyQuerySettings();
@ -238,7 +239,6 @@ protected:
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database, std::chrono::time_point<std::chrono::system_clock> now);
void logToQueryViewsLog(const ViewRuntimeData & vinfo);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;

View File

@ -33,25 +33,108 @@
namespace DB
{
class ExceptionCollectingTransform : public IProcessor
struct ViewsData
{
std::vector<ViewRuntimeData> views;
/// In case of exception happened while inserting into main table, it is pushed to pipeline.
/// Remember the first one, we should keep them after view processing.
std::atomic_bool has_exception = false;
std::exception_ptr first_exception;
};
using ViewsDataPtr = std::shared_ptr<ViewsData>;
class CopyingDataToViewsTransform final : public IProcessor
{
public:
ExceptionCollectingTransform(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), {header})
, output(outputs.front())
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data)
: IProcessor({header}, OutputPorts(data->views.size(), header))
, input(inputs.front())
, views_data(std::move(data))
{
has_exception.assign(num_inputs, false);
if (views_data->views.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs");
}
String getName() const override { return "CopyingDataToViewsTransform"; }
Status prepare() override
{
bool all_can_push = true;
for (auto & output : outputs)
{
if (output.isFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot push data to view because output port is finished");
if (!output.canPush())
all_can_push = false;
}
if (!all_can_push)
return Status::PortFull;
if (input.isFinished())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
auto data = input.pullData();
if (data.exception)
{
if (!views_data->has_exception)
{
views_data->first_exception = data.exception;
views_data->has_exception = true;
}
for (auto & output : outputs)
output.pushException(data.exception);
}
else
{
for (auto & output : outputs)
output.push(data.chunk.clone());
}
}
InputPort & getInputPort() { return input; }
private:
InputPort & input;
ViewsDataPtr views_data;
};
class FinalizingViewsTransform final : public IProcessor
{
struct ExceptionStatus
{
std::exception_ptr exception;
bool is_first = false;
};
public:
FinalizingViewsTransform(const Block & header, ViewsDataPtr data)
: IProcessor(InputPorts(data->views.size(), header), {header})
, output(outputs.front())
, views_data(std::move(data))
{
statuses.resize(views_data->views.size());
}
String getName() const override { return "FinalizingViewsTransform"; }
Status prepare() override
{
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot finalize views because output port is finished");
if (!output.canPush())
return Status::PortFull;
@ -75,9 +158,13 @@ public:
auto data = input.pullData();
if (data.exception)
{
if (i == 0 || !has_exception[i])
if (views_data->has_exception && views_data->first_exception == data.exception)
statuses[i].is_first = true;
else
statuses[i].exception = data.exception;
if (i == 0 && statuses[0].is_first)
{
has_exception[i] = true;
output.pushData(std::move(data));
return Status::PortFull;
}
@ -90,6 +177,9 @@ public:
if (num_finished == inputs.size())
{
if (!statuses.empty())
return Status::Ready;
output.finish();
return Status::Finished;
}
@ -97,9 +187,29 @@ public:
return Status::NeedData;
}
void work() override
{
size_t num_views = statuses.size();
for (size_t i = 0; i < num_views; ++i)
{
auto & view = views_data->views[i];
auto & status = statuses[i];
if (status.exception)
{
if (!any_exception)
any_exception = status.exception;
view.setException(std::move(status.exception));
}
}
statuses.clear();
}
private:
OutputPort & output;
std::vector<bool> has_exception;
ViewsDataPtr views_data;
std::vector<ExceptionStatus> statuses;
std::exception_ptr any_exception;
};
class ExceptionHandlingSink : public IProcessor
@ -580,37 +690,12 @@ static void process(Block & block, ViewRuntimeData & view)
void ExecutingInnerQueryFromViewTransform::transform(Chunk & chunk)
{
Stopwatch watch;
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
if (view.runtime_stats.thread_status)
{
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
view.runtime_stats.thread_status->resetPerformanceCountersLastUsage();
current_thread = view.runtime_stats.thread_status.get();
}
try
runViewStage(view, "while pushing to view", [&]
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
process(block, view);
chunk.setColumns(block.getColumns(), block.rows());
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
view.setException(std::current_exception());
}
catch (...)
{
view.setException(std::current_exception());
}
if (view.runtime_stats.thread_status)
view.runtime_stats.thread_status->updatePerformanceCounters();
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
});
}
void PushingToViewsBlockOutputStream::checkExceptionsInViews()
@ -625,9 +710,9 @@ void PushingToViewsBlockOutputStream::checkExceptionsInViews()
}
}
void PushingToViewsBlockOutputStream::logQueryViews()
static void logQueryViews(std::vector<ViewRuntimeData> & views, ContextPtr context)
{
const auto & settings = getContext()->getSettingsRef();
const auto & settings = context->getSettingsRef();
const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds();
const QueryViewsLogElement::ViewStatus min_status = settings.log_queries_min_type;
if (views.empty() || !settings.log_queries || !settings.log_query_views)
@ -650,10 +735,4 @@ void PushingToViewsBlockOutputStream::logQueryViews()
}
}
void PushingToViewsBlockOutputStream::onProgress(const Progress & progress)
{
if (getContext()->getProgressCallback())
getContext()->getProgressCallback()(progress);
}
}

View File

@ -91,6 +91,7 @@ public:
protected:
void transform(Chunk & chunk) override;
void onFinish() override;
private:
ViewRuntimeData view;