Rewrite PushingToViewsBlockOutputStream part 2.

This commit is contained in:
Nikolai Kochetov 2021-08-30 14:03:39 +03:00
parent 4d821efa15
commit eccee47e72
3 changed files with 30 additions and 0 deletions

View File

@ -36,6 +36,7 @@ namespace DB
struct ViewsData
{
std::vector<ViewRuntimeData> views;
ContextPtr context;
/// In case of exception happened while inserting into main table, it is pushed to pipeline.
/// Remember the first one, we should keep them after view processing.
@ -112,6 +113,8 @@ private:
ViewsDataPtr views_data;
};
static void logQueryViews(std::vector<ViewRuntimeData> & views, ContextPtr context);
class FinalizingViewsTransform final : public IProcessor
{
struct ExceptionStatus
@ -202,6 +205,9 @@ public:
view.setException(std::move(status.exception));
}
}
logQueryViews(views_data->views, views_data->context);
statuses.clear();
}

View File

@ -1,4 +1,6 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Common/ThreadStatus.h>
#include <Common/Scope
namespace DB
{
@ -64,6 +66,12 @@ IProcessor::Status ExceptionKeepingTransform::prepare()
return Status::Ready;
}
static std::exception_ptr runStep(std::function<void()> func, ExceptionKeepingTransform::RuntimeData * runtime_data)
{
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
}
void ExceptionKeepingTransform::work()
{
if (!was_on_start_called)

View File

@ -5,6 +5,8 @@
namespace DB
{
class ThreadStatus;
/// Has one input and one output.
/// Works similarly to ISimpleTransform, but with much care about exceptions.
///
@ -46,6 +48,20 @@ public:
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
struct RuntimeData
{
std::unique_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::string additional_exception_message;
};
using RuntimeDataPtr = std::shared_ptr<RuntimeData>;
void setRuntimeData(RuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); }
private:
RuntimeDataPtr runtime_data;
};