diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index f12e19bda99..1ed72319fb8 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -35,13 +35,18 @@ namespace DB struct ViewsData { - std::vector views; + std::list views; ContextPtr context; /// In case of exception happened while inserting into main table, it is pushed to pipeline. /// Remember the first one, we should keep them after view processing. std::atomic_bool has_exception = false; std::exception_ptr first_exception; + + ViewsData(std::list views_, ContextPtr context_) + : views(std::move(views_)), context(std::move(context_)) + { + } }; using ViewsDataPtr = std::shared_ptr; @@ -113,7 +118,7 @@ private: ViewsDataPtr views_data; }; -static void logQueryViews(std::vector & views, ContextPtr context); +static void logQueryViews(std::list & views, ContextPtr context); class FinalizingViewsTransform final : public IProcessor { @@ -192,11 +197,12 @@ public: void work() override { - size_t num_views = statuses.size(); - for (size_t i = 0; i < num_views; ++i) + size_t i = 0; + for (auto & view : views_data->views) { - auto & view = views_data->views[i]; auto & status = statuses[i]; + ++i; + if (status.exception) { if (!any_exception) @@ -268,6 +274,11 @@ Drain buildPushingToViewsDrainImpl( { checkStackSize(); + /// If we don't write directly to the destination + /// then expect that we're inserting with precalculated virtual columns + auto storage_header = no_destination ? metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals()) + : metadata_snapshot->getSampleBlock(); + /** TODO This is a very important line. At any insertion into the table one of streams should own lock. * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * but it's clear that here is not the best place for this functionality. @@ -304,7 +315,8 @@ Drain buildPushingToViewsDrainImpl( insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value); } - std::vector views; + std::list views; + std::vector drains; for (const auto & database_table : dependencies) { @@ -349,7 +361,7 @@ Drain buildPushingToViewsDrainImpl( ASTPtr insert_query_ptr(insert.release()); InterpreterInsertQuery interpreter(insert_query_ptr, insert_context); BlockIO io = interpreter.execute(); - out = io.out; + out = std::move(io.out); } else if (const auto * live_view = dynamic_cast(dependent_table.get())) { @@ -393,8 +405,21 @@ Drain buildPushingToViewsDrainImpl( 0, std::chrono::system_clock::now(), QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START}; - views.emplace_back(ViewRuntimeData{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)}); + views.emplace_back(ViewRuntimeData{ + std::move(query), + out.getHeader(), + database_table, + dependent_table, + dependent_metadata_snapshot, + select_context, + nullptr, + std::move(runtime_stats)}); + + auto executing_inner_query = std::make_shared(storage_header, views.back()); + out.addTransform(std::move(executing_inner_query)); + + drains.emplace_back(std::move(out)); /// Add the view to the query access info so it can appear in system.query_log if (!no_destination) @@ -404,14 +429,24 @@ Drain buildPushingToViewsDrainImpl( } } + size_t num_views = views.size(); + if (num_views != 0) + { + auto views_data = std::make_shared(std::move(views), context); + auto copying_data = std::make_shared(storage_header, std::move(views_data)); + auto it = copying_data->getOutputs().begin(); + for (auto & drain : drains) + connect(*it, drain.getPort()); + + + } + /// Do not push to destination table if the flag is set if (!no_destination) { auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), context); metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName()); - - auto replicated_output = dynamic_cast(sink.get()); } } @@ -632,8 +667,6 @@ void PushingToViewsBlockOutputStream::flush() static void process(Block & block, ViewRuntimeData & view) { - const auto & storage = view.storage; - const auto & metadata_snapshot = view.metadata_snapshot; const auto & context = view.context; /// We create a table with the same name as original table and the same alias columns, @@ -641,10 +674,10 @@ static void process(Block & block, ViewRuntimeData & view) /// InterpreterSelectQuery will do processing of alias columns. auto local_context = Context::createCopy(context); local_context->addViewSource(StorageValues::create( - storage->getStorageID(), - metadata_snapshot->getColumns(), + view.table_id, + view.metadata_snapshot->getColumns(), block, - storage->getVirtuals())); + view.storage->getVirtuals())); /// We need keep InterpreterSelectQuery, until the processing will be finished, since: /// @@ -716,7 +749,7 @@ void PushingToViewsBlockOutputStream::checkExceptionsInViews() } } -static void logQueryViews(std::vector & views, ContextPtr context) +static void logQueryViews(std::list & views, ContextPtr context) { const auto & settings = context->getSettingsRef(); const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds(); diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index daaace1ee8b..e7f800406d7 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -81,7 +81,7 @@ private: class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform { public: - ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData view_data) + ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_data) : ExceptionKeepingTransform(header, view_data.sample_block) , view(std::move(view_data)) { @@ -94,7 +94,7 @@ protected: void onFinish() override; private: - ViewRuntimeData view; + ViewRuntimeData & view; }; } diff --git a/src/Processors/Sinks/SinkToStorage.cpp b/src/Processors/Sinks/SinkToStorage.cpp index 97fdf45884a..8bf5fdf3fd2 100644 --- a/src/Processors/Sinks/SinkToStorage.cpp +++ b/src/Processors/Sinks/SinkToStorage.cpp @@ -1,6 +1,7 @@ #include #include -#include +#include namespace DB { @@ -40,7 +41,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare() { if (input.isFinished()) { - if (!was_on_finish_called) + if (!was_on_finish_called && !has_exception) return Status::Ready; output.finish(); @@ -56,6 +57,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare() if (data.exception) { + has_exception = true; output.pushData(std::move(data)); return Status::PortFull; } @@ -66,10 +68,46 @@ IProcessor::Status ExceptionKeepingTransform::prepare() return Status::Ready; } -static std::exception_ptr runStep(std::function func, ExceptionKeepingTransform::RuntimeData * runtime_data) +static std::exception_ptr runStep(std::function step, ExceptionKeepingTransform::RuntimeData * runtime_data) { auto * original_thread = current_thread; SCOPE_EXIT({ current_thread = original_thread; }); + + if (runtime_data && runtime_data->thread_status) + { + /// Change thread context to store individual metrics. Once the work in done, go back to the original thread + runtime_data->thread_status->resetPerformanceCountersLastUsage(); + current_thread = runtime_data->thread_status.get(); + } + + std::exception_ptr res; + Stopwatch watch; + + try + { + step(); + } + catch (Exception & exception) + { + if (runtime_data && !runtime_data->additional_exception_message.empty()) + exception.addMessage(runtime_data->additional_exception_message); + + res = std::current_exception(); + } + catch (...) + { + res = std::current_exception(); + } + + if (runtime_data) + { + if (runtime_data->thread_status) + runtime_data->thread_status->updatePerformanceCounters(); + + runtime_data->elapsed_ms += watch.elapsedMilliseconds(); + } + + return res; } void ExceptionKeepingTransform::work() @@ -77,35 +115,37 @@ void ExceptionKeepingTransform::work() if (!was_on_start_called) { was_on_start_called = true; - onStart(); - } - if (ready_input) + if (auto exception = runStep([this] { onStart(); }, runtime_data.get())) + { + has_exception = true; + ready_output = true; + data.exception = std::move(exception); + } + } + else if (ready_input) { ready_input = false; - ready_output = true; - try - { - transform(data.chunk); - } - catch (...) + if (auto exception = runStep([this] { transform(data.chunk); }, runtime_data.get())) { + has_exception = true; data.chunk.clear(); - data.exception = std::current_exception(); + data.exception = std::move(exception); } + + if (data.chunk || data.exception) + ready_output = true; } else if (!was_on_finish_called) { was_on_finish_called = true; - try + + if (auto exception = runStep([this] { onFinish(); }, runtime_data.get())) { - onFinish(); - } - catch (...) - { - ready_input = true; - data.exception = std::current_exception(); + has_exception = true; + ready_output = true; + data.exception = std::move(exception); } } } @@ -115,6 +155,8 @@ SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(h void SinkToStorage::transform(Chunk & chunk) { consume(chunk.clone()); + if (lastBlockIsDuplicate()) + chunk.clear(); } } diff --git a/src/Processors/Sinks/SinkToStorage.h b/src/Processors/Sinks/SinkToStorage.h index a5e8aab9226..d66391833ec 100644 --- a/src/Processors/Sinks/SinkToStorage.h +++ b/src/Processors/Sinks/SinkToStorage.h @@ -19,9 +19,7 @@ class ThreadStatus; /// It is expected that output port won't be closed from the other side before all data is processed. /// /// Method onStart() is called before reading any data. -/// Exception from it is not pushed into pipeline, but thrown immediately. -/// -/// Method onFinish() is called after all data from input is processed. +/// Method onFinish() is called after all data from input is processed, if no exception happened. /// In case of exception, it is additionally pushed into pipeline. class ExceptionKeepingTransform : public IProcessor { @@ -32,6 +30,7 @@ private: bool ready_input = false; bool ready_output = false; + bool has_exception = false; bool was_on_start_called = false; bool was_on_finish_called = false; @@ -76,6 +75,7 @@ public: protected: virtual void consume(Chunk chunk) = 0; + virtual bool lastBlockIsDuplicate() const { return false; } private: std::vector table_locks; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index 2a6702736df..75f002c6b42 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -44,8 +44,12 @@ public: void writeExistingPart(MergeTreeData::MutableDataPartPtr & part); /// For proper deduplication in MaterializedViews - bool lastBlockIsDuplicate() const + bool lastBlockIsDuplicate() const override { + /// If MV is responsible for deduplication, block is not considered duplicating. + if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views) + return false; + return last_block_is_duplicate; }