From eccee47e72df27bc1ef201b69ef85a7ae49bfff6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 30 Aug 2021 14:03:39 +0300 Subject: [PATCH] Rewrite PushingToViewsBlockOutputStream part 2. --- .../PushingToViewsBlockOutputStream.cpp | 6 ++++++ src/Processors/Sinks/SinkToStorage.cpp | 8 ++++++++ src/Processors/Sinks/SinkToStorage.h | 16 ++++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 9f9ed68851b..f12e19bda99 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -36,6 +36,7 @@ namespace DB struct ViewsData { std::vector views; + ContextPtr context; /// In case of exception happened while inserting into main table, it is pushed to pipeline. /// Remember the first one, we should keep them after view processing. @@ -112,6 +113,8 @@ private: ViewsDataPtr views_data; }; +static void logQueryViews(std::vector & views, ContextPtr context); + class FinalizingViewsTransform final : public IProcessor { struct ExceptionStatus @@ -202,6 +205,9 @@ public: view.setException(std::move(status.exception)); } } + + logQueryViews(views_data->views, views_data->context); + statuses.clear(); } diff --git a/src/Processors/Sinks/SinkToStorage.cpp b/src/Processors/Sinks/SinkToStorage.cpp index f538bb3c8f0..97fdf45884a 100644 --- a/src/Processors/Sinks/SinkToStorage.cpp +++ b/src/Processors/Sinks/SinkToStorage.cpp @@ -1,4 +1,6 @@ #include +#include +#include func, ExceptionKeepingTransform::RuntimeData * runtime_data) +{ + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); +} + void ExceptionKeepingTransform::work() { if (!was_on_start_called) diff --git a/src/Processors/Sinks/SinkToStorage.h b/src/Processors/Sinks/SinkToStorage.h index 3629b8714da..a5e8aab9226 100644 --- a/src/Processors/Sinks/SinkToStorage.h +++ b/src/Processors/Sinks/SinkToStorage.h @@ -5,6 +5,8 @@ namespace DB { +class ThreadStatus; + /// Has one input and one output. /// Works similarly to ISimpleTransform, but with much care about exceptions. /// @@ -46,6 +48,20 @@ public: InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; } + + struct RuntimeData + { + std::unique_ptr thread_status = nullptr; + UInt64 elapsed_ms = 0; + std::string additional_exception_message; + }; + + using RuntimeDataPtr = std::shared_ptr; + + void setRuntimeData(RuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); } + +private: + RuntimeDataPtr runtime_data; };