diff --git a/src/DataStreams/AddingDefaultBlockOutputStream.cpp b/src/DataStreams/AddingDefaultBlockOutputStream.cpp index 6f7975d492d..118926cfdd7 100644 --- a/src/DataStreams/AddingDefaultBlockOutputStream.cpp +++ b/src/DataStreams/AddingDefaultBlockOutputStream.cpp @@ -7,14 +7,14 @@ namespace DB { AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream( - const BlockOutputStreamPtr & output_, - const Block & header_, + const Block & in_header, + const Block & out_header, const ColumnsDescription & columns_, ContextPtr context_, bool null_as_default_) : output(output_), header(header_) { - auto dag = addMissingDefaults(header_, output->getHeader().getNamesAndTypesList(), columns_, context_, null_as_default_); + auto dag = addMissingDefaults(in_header, output_header.getNamesAndTypesList(), columns_, context_, null_as_default_); adding_defaults_actions = std::make_shared(std::move(dag), ExpressionActionsSettings::fromContext(context_, CompileExpressions::yes)); } @@ -25,19 +25,6 @@ void AddingDefaultBlockOutputStream::write(const Block & block) output->write(copy); } -void AddingDefaultBlockOutputStream::flush() -{ - output->flush(); -} -void AddingDefaultBlockOutputStream::writePrefix() -{ - output->writePrefix(); -} - -void AddingDefaultBlockOutputStream::writeSuffix() -{ - output->writeSuffix(); -} } diff --git a/src/DataStreams/AddingDefaultBlockOutputStream.h b/src/DataStreams/AddingDefaultBlockOutputStream.h index 45ff30a3daa..237bd994613 100644 --- a/src/DataStreams/AddingDefaultBlockOutputStream.h +++ b/src/DataStreams/AddingDefaultBlockOutputStream.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -20,18 +20,17 @@ class Context; * Also the stream can substitute NULL into DEFAULT value in case of INSERT SELECT query (null_as_default) if according setting is 1. * All three types of columns are materialized (not constants). */ -class AddingDefaultBlockOutputStream : public IBlockOutputStream +class AddingMissingDefaultsTransform : public ISimpleTransform { public: - AddingDefaultBlockOutputStream( - const BlockOutputStreamPtr & output_, - const Block & header_, + AddingMissingDefaultsTransform( + const Block & in_header, + const Block & out_header, const ColumnsDescription & columns_, ContextPtr context_, bool null_as_default_ = false); - Block getHeader() const override { return header; } - void write(const Block & block) override; + void transform(Chunk & chunk) override; void flush() override; @@ -39,8 +38,6 @@ public: void writeSuffix() override; private: - BlockOutputStreamPtr output; - const Block header; ExpressionActionsPtr adding_defaults_actions; }; diff --git a/src/DataStreams/BlockIO.h b/src/DataStreams/BlockIO.h index 31a0e1020d2..7c8edc3318a 100644 --- a/src/DataStreams/BlockIO.h +++ b/src/DataStreams/BlockIO.h @@ -5,6 +5,7 @@ #include #include +#include namespace DB @@ -25,7 +26,7 @@ struct BlockIO std::shared_ptr process_list_entry; - BlockOutputStreamPtr out; + Chain out; BlockInputStreamPtr in; QueryPipeline pipeline; diff --git a/src/DataStreams/CheckConstraintsBlockOutputStream.cpp b/src/DataStreams/CheckConstraintsBlockOutputStream.cpp index fbf4a777032..9dace69d38f 100644 --- a/src/DataStreams/CheckConstraintsBlockOutputStream.cpp +++ b/src/DataStreams/CheckConstraintsBlockOutputStream.cpp @@ -22,26 +22,24 @@ namespace ErrorCodes } -CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream( +CheckConstraintsTransform::CheckConstraintsTransform( const StorageID & table_id_, - const BlockOutputStreamPtr & output_, - const Block & header_, + const Block & header, const ConstraintsDescription & constraints_, ContextPtr context_) - : table_id(table_id_), - output(output_), - header(header_), - constraints(constraints_), - expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())) + : ISimpleTransform(header, header, false) + , table_id(table_id_) + , constraints(constraints_) + , expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())) { } -void CheckConstraintsBlockOutputStream::write(const Block & block) +void CheckConstraintsTransform::transform(Chunk & chunk) { - if (block.rows() > 0) + if (chunk.getNumRows() > 0) { - Block block_to_calculate = block; + Block block_to_calculate = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); for (size_t i = 0; i < expressions.size(); ++i) { auto constraint_expr = expressions[i]; @@ -101,7 +99,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block) column_values_msg.reserve(approx_bytes_for_col * related_columns.size()); for (const auto & name : related_columns) { - const IColumn & column = *block.getByName(name).column; + const IColumn & column = *chunk.getColumns()[getInputPort().getHeader().getPositionByName(name)]; assert(row_idx < column.size()); if (!first) @@ -124,23 +122,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block) } } - output->write(block); - rows_written += block.rows(); -} - -void CheckConstraintsBlockOutputStream::flush() -{ - output->flush(); -} - -void CheckConstraintsBlockOutputStream::writePrefix() -{ - output->writePrefix(); -} - -void CheckConstraintsBlockOutputStream::writeSuffix() -{ - output->writeSuffix(); + rows_written += chunk.getNumRows(); } } diff --git a/src/DataStreams/CheckConstraintsBlockOutputStream.h b/src/DataStreams/CheckConstraintsBlockOutputStream.h index 0f115550eb8..aa6d96aa5d3 100644 --- a/src/DataStreams/CheckConstraintsBlockOutputStream.h +++ b/src/DataStreams/CheckConstraintsBlockOutputStream.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -12,28 +12,21 @@ namespace DB * Otherwise just pass block to output unchanged. */ -class CheckConstraintsBlockOutputStream : public IBlockOutputStream +class CheckConstraintsTransform final : public ISimpleTransform { public: - CheckConstraintsBlockOutputStream( + CheckConstraintsTransform( const StorageID & table_, - const BlockOutputStreamPtr & output_, - const Block & header_, + const Block & header, const ConstraintsDescription & constraints_, ContextPtr context_); - Block getHeader() const override { return header; } - void write(const Block & block) override; + String getName() const override { return "CheckConstraintsTransform"; } - void flush() override; - - void writePrefix() override; - void writeSuffix() override; + void transform(Chunk & chunk) override; private: StorageID table_id; - BlockOutputStreamPtr output; - Block header; const ConstraintsDescription constraints; const ConstraintsExpressions expressions; size_t rows_written = 0; diff --git a/src/DataStreams/CountingBlockOutputStream.cpp b/src/DataStreams/CountingBlockOutputStream.cpp index 6594b3b2ce1..e17dc80ca13 100644 --- a/src/DataStreams/CountingBlockOutputStream.cpp +++ b/src/DataStreams/CountingBlockOutputStream.cpp @@ -12,11 +12,9 @@ namespace ProfileEvents namespace DB { -void CountingBlockOutputStream::write(const Block & block) +void CountingTransform::transform(Chunk & chunk) { - stream->write(block); - - Progress local_progress(block.rows(), block.bytes(), 0); + Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0); progress.incrementPiecewiseAtomically(local_progress); ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.read_rows); diff --git a/src/DataStreams/CountingBlockOutputStream.h b/src/DataStreams/CountingBlockOutputStream.h index c7247b39945..702a9f190f8 100644 --- a/src/DataStreams/CountingBlockOutputStream.h +++ b/src/DataStreams/CountingBlockOutputStream.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include @@ -9,11 +9,12 @@ namespace DB /// Proxy class which counts number of written block, rows, bytes -class CountingBlockOutputStream : public IBlockOutputStream +class CountingTransform final : public ISimpleTransform { public: - CountingBlockOutputStream(const BlockOutputStreamPtr & stream_) - : stream(stream_) {} + explicit CountingTransform(const Block & header) : ISimpleTransform(header, header, false) {} + + String getName() const override { return "CountingTransform"; } void setProgressCallback(const ProgressCallback & callback) { @@ -30,17 +31,9 @@ public: return progress; } - Block getHeader() const override { return stream->getHeader(); } - void write(const Block & block) override; - - void writePrefix() override { stream->writePrefix(); } - void writeSuffix() override { stream->writeSuffix(); } - void flush() override { stream->flush(); } - void onProgress(const Progress & current_progress) override { stream->onProgress(current_progress); } - String getContentType() const override { return stream->getContentType(); } + void transform(Chunk & chunk) override; protected: - BlockOutputStreamPtr stream; Progress progress; ProgressCallback progress_callback; QueryStatus * process_elem = nullptr; diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 1ed72319fb8..4ad64206442 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -1,10 +1,4 @@ -#include -#include -#include -#include #include -#include -#include #include #include #include @@ -19,12 +13,9 @@ #include #include #include -#include #include #include #include -#include -#include #include #include @@ -128,9 +119,18 @@ class FinalizingViewsTransform final : public IProcessor bool is_first = false; }; + static InputPorts initPorts(std::vector headers) + { + InputPorts res; + for (auto & header : headers) + res.emplace_back(std::move(header)); + + return res; + } + public: - FinalizingViewsTransform(const Block & header, ViewsDataPtr data) - : IProcessor(InputPorts(data->views.size(), header), {header}) + FinalizingViewsTransform(std::vector headers, ViewsDataPtr data) + : IProcessor(initPorts(std::move(headers)), {Block()}) , output(outputs.front()) , views_data(std::move(data)) { @@ -224,53 +224,54 @@ private: std::exception_ptr any_exception; }; -class ExceptionHandlingSink : public IProcessor -{ -public: - explicit ExceptionHandlingSink(Block header) - : IProcessor({std::move(header)}, {}) - , input(inputs.front()) - { - } +//class ExceptionHandlingSink : public IProcessor +//{ +//public: +// explicit ExceptionHandlingSink(Block header) +// : IProcessor({std::move(header)}, {}) +// , input(inputs.front()) +// { +// } +// +// Status prepare() override +// { +// while (!input.isFinished()) +// { +// input.setNeeded(); +// if (!input.hasData()) +// return Status::NeedData; +// +// auto data = input.pullData(); +// if (data.exception) +// exceptions.emplace_back(std::move(data.exception)); +// } +// +// if (!exceptions.empty()) +// return Status::Ready; +// +// return Status::Finished; +// } +// +// void work() override +// { +// auto exception = std::move(exceptions.at(0)); +// exceptions.clear(); +// std::rethrow_exception(std::move(exception)); +// } +// +//private: +// InputPort & input; +// std::vector exceptions; +//}; - Status prepare() override - { - while (!input.isFinished()) - { - input.setNeeded(); - if (!input.hasData()) - return Status::NeedData; - - auto data = input.pullData(); - if (data.exception) - exceptions.emplace_back(std::move(data.exception)); - } - - if (!exceptions.empty()) - return Status::Ready; - - return Status::Finished; - } - - void work() override - { - auto exception = std::move(exceptions.at(0)); - exceptions.clear(); - std::rethrow_exception(std::move(exception)); - } - -private: - InputPort & input; - std::vector exceptions; -}; - -Drain buildPushingToViewsDrainImpl( +Chain buildPushingToViewsDrain( const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, bool no_destination, - std::vector & locks) + std::vector & locks, + ExceptionKeepingTransformRuntimeDataPtr runtime_data) { checkStackSize(); @@ -316,7 +317,7 @@ Drain buildPushingToViewsDrainImpl( } std::list views; - std::vector drains; + std::vector chains; for (const auto & database_table : dependencies) { @@ -324,10 +325,38 @@ Drain buildPushingToViewsDrainImpl( auto dependent_metadata_snapshot = dependent_table->getInMemoryMetadataPtr(); ASTPtr query; - Drain out; + Chain out; QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT; String target_name = database_table.getFullTableName(); + /// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or + /// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics + std::unique_ptr thread_status = nullptr; + + ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup() + ? current_thread->getThreadGroup() + : MainThreadStatus::getInstance().getThreadGroup(); + if (running_group) + { + /// We are creating a ThreadStatus per view to store its metrics individually + /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls + /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, + /// and switch back to the original thread_status. + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + thread_status = std::make_unique(); + /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one + /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means + /// N times more interruptions + thread_status->disableProfiling(); + thread_status->attachQuery(running_group); + } + + auto view_runtime_data = std::make_shared( + std::move(thread_status), + database_table.getNameForLogs()); + if (auto * materialized_view = dynamic_cast(dependent_table.get())) { type = QueryViewsLogElement::ViewType::MATERIALIZED; @@ -359,7 +388,7 @@ Drain buildPushingToViewsDrainImpl( insert->columns = std::move(list); ASTPtr insert_query_ptr(insert.release()); - InterpreterInsertQuery interpreter(insert_query_ptr, insert_context); + InterpreterInsertQuery interpreter(insert_query_ptr, insert_context, false, false, false, runtime_data); BlockIO io = interpreter.execute(); out = std::move(io.out); } @@ -367,36 +396,12 @@ Drain buildPushingToViewsDrainImpl( { type = QueryViewsLogElement::ViewType::LIVE; query = live_view->getInnerQuery(); // Used only to log in system.query_views_log - out = buildPushingToViewsDrainImpl( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks); + out = buildPushingToViewsDrain( + dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, locks, view_runtime_data); } else - out = buildPushingToViewsDrainImpl( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks); - - /// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or - /// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics - std::unique_ptr thread_status = nullptr; - - ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup() - ? current_thread->getThreadGroup() - : MainThreadStatus::getInstance().getThreadGroup(); - if (running_group) - { - /// We are creating a ThreadStatus per view to store its metrics individually - /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls - /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, - /// and switch back to the original thread_status. - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - thread_status = std::make_unique(); - /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one - /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means - /// N times more interruptions - thread_status->disableProfiling(); - thread_status->attachQuery(running_group); - } + out = buildPushingToViewsDrain( + dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), false, locks, view_runtime_data); QueryViewsLogElement::ViewRuntimeStats runtime_stats{ target_name, @@ -408,7 +413,7 @@ Drain buildPushingToViewsDrainImpl( views.emplace_back(ViewRuntimeData{ std::move(query), - out.getHeader(), + out.getInputHeader(), database_table, dependent_table, dependent_metadata_snapshot, @@ -417,9 +422,10 @@ Drain buildPushingToViewsDrainImpl( std::move(runtime_stats)}); auto executing_inner_query = std::make_shared(storage_header, views.back()); - out.addTransform(std::move(executing_inner_query)); + executing_inner_query->setRuntimeData(view_runtime_data); - drains.emplace_back(std::move(out)); + out.addSource(std::move(executing_inner_query)); + chains.emplace_back(std::move(out)); /// Add the view to the query access info so it can appear in system.query_log if (!no_destination) @@ -429,240 +435,46 @@ Drain buildPushingToViewsDrainImpl( } } + Chain result_chain; + size_t num_views = views.size(); if (num_views != 0) { + std::vector headers; + headers.reserve(num_views); + for (const auto & chain : chains) + headers.push_back(chain.getOutputHeader()); + auto views_data = std::make_shared(std::move(views), context); - auto copying_data = std::make_shared(storage_header, std::move(views_data)); - auto it = copying_data->getOutputs().begin(); - for (auto & drain : drains) - connect(*it, drain.getPort()); + auto copying_data = std::make_shared(storage_header, views_data); + auto finalizing_views = std::make_shared(std::move(headers), views_data); + auto out = copying_data->getOutputs().begin(); + auto in = finalizing_views->getInputs().begin(); + std::list processors; + for (auto & chain : chains) + { + connect(*out, chain.getInputPort()); + connect(chain.getOutputPort(), *in); + processors.splice(processors.end(), Chain::getProcessors(std::move(chain))); + } + + processors.emplace_front(std::move(copying_data)); + processors.emplace_back(std::move(finalizing_views)); + result_chain = Chain(std::move(processors)); } /// Do not push to destination table if the flag is set if (!no_destination) { auto sink = storage->write(query_ptr, storage->getInMemoryMetadataPtr(), context); - + sink->setRuntimeData(runtime_data); metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName()); - } -} - -Block PushingToViewsBlockOutputStream::getHeader() const -{ - /// If we don't write directly to the destination - /// then expect that we're inserting with precalculated virtual columns - if (output) - return metadata_snapshot->getSampleBlock(); - else - return metadata_snapshot->getSampleBlockWithVirtuals(storage->getVirtuals()); -} - -/// Auxiliary function to do the setup and teardown to run a view individually and collect its metrics inside the view ThreadStatus -void inline runViewStage(ViewRuntimeData & view, const std::string & action, std::function stage) -{ - Stopwatch watch; - - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - if (view.runtime_stats.thread_status) - { - /// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread - view.runtime_stats.thread_status->resetPerformanceCountersLastUsage(); - current_thread = view.runtime_stats.thread_status.get(); + result_chain.addSource(std::move(sink)); } - try - { - stage(); - } - catch (Exception & ex) - { - ex.addMessage(action + " " + view.table_id.getNameForLogs()); - view.setException(std::current_exception()); - } - catch (...) - { - view.setException(std::current_exception()); - } - - if (view.runtime_stats.thread_status) - view.runtime_stats.thread_status->updatePerformanceCounters(); - view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds(); -} - -void PushingToViewsBlockOutputStream::write(const Block & block) -{ - /** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match. - * We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes. - * NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required), - * but currently we don't have methods for serialization of nested structures "as a whole". - */ - Nested::validateArraySizes(block); - - if (auto * live_view = dynamic_cast(storage.get())) - { - StorageLiveView::writeIntoLiveView(*live_view, block, getContext()); - } - else - { - if (output) - /// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended - /// with additional columns directly from storage and pass it to MVs instead of raw block. - output->write(block); - } - - if (views.empty()) - return; - - /// Don't process materialized views if this block is duplicate - const Settings & settings = getContext()->getSettingsRef(); - if (!settings.deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate()) - return; - - size_t max_threads = 1; - if (settings.parallel_view_processing) - max_threads = settings.max_threads ? std::min(static_cast(settings.max_threads), views.size()) : views.size(); - if (max_threads > 1) - { - ThreadPool pool(max_threads); - for (auto & view : views) - { - pool.scheduleOrThrowOnError([&] { - setThreadName("PushingToViews"); - runViewStage(view, "while pushing to view", [&]() { process(block, view); }); - }); - } - pool.wait(); - } - else - { - for (auto & view : views) - { - runViewStage(view, "while pushing to view", [&]() { process(block, view); }); - } - } -} - -void PushingToViewsBlockOutputStream::writePrefix() -{ - if (output) - output->writePrefix(); - - for (auto & view : views) - { - runViewStage(view, "while writing prefix to view", [&] { view.out->writePrefix(); }); - if (view.exception) - { - logQueryViews(); - std::rethrow_exception(view.exception); - } - } -} - -void PushingToViewsBlockOutputStream::writeSuffix() -{ - if (output) - output->writeSuffix(); - - if (views.empty()) - return; - - auto process_suffix = [](ViewRuntimeData & view) - { - view.out->writeSuffix(); - view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH); - }; - static std::string stage_step = "while writing suffix to view"; - - /// Run writeSuffix() for views in separate thread pool. - /// In could have been done in PushingToViewsBlockOutputStream::process, however - /// it is not good if insert into main table fail but into view succeed. - const Settings & settings = getContext()->getSettingsRef(); - size_t max_threads = 1; - if (settings.parallel_view_processing) - max_threads = settings.max_threads ? std::min(static_cast(settings.max_threads), views.size()) : views.size(); - bool exception_happened = false; - if (max_threads > 1) - { - ThreadPool pool(max_threads); - std::atomic_uint8_t exception_count = 0; - for (auto & view : views) - { - if (view.exception) - { - exception_happened = true; - continue; - } - pool.scheduleOrThrowOnError([&] { - setThreadName("PushingToViews"); - - runViewStage(view, stage_step, [&] { process_suffix(view); }); - if (view.exception) - { - exception_count.fetch_add(1, std::memory_order_relaxed); - } - else - { - LOG_TRACE( - log, - "Pushing (parallel {}) from {} to {} took {} ms.", - max_threads, - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.runtime_stats.elapsed_ms); - } - }); - } - pool.wait(); - exception_happened |= exception_count.load(std::memory_order_relaxed) != 0; - } - else - { - for (auto & view : views) - { - if (view.exception) - { - exception_happened = true; - continue; - } - runViewStage(view, stage_step, [&] { process_suffix(view); }); - if (view.exception) - { - exception_happened = true; - } - else - { - LOG_TRACE( - log, - "Pushing (sequentially) from {} to {} took {} ms.", - storage->getStorageID().getNameForLogs(), - view.table_id.getNameForLogs(), - view.runtime_stats.elapsed_ms); - } - } - } - if (exception_happened) - checkExceptionsInViews(); - - if (views.size() > 1) - { - UInt64 milliseconds = main_watch.elapsedMilliseconds(); - LOG_DEBUG(log, "Pushing from {} to {} views took {} ms.", storage->getStorageID().getNameForLogs(), views.size(), milliseconds); - } - logQueryViews(); -} - -void PushingToViewsBlockOutputStream::flush() -{ - if (output) - output->flush(); - - for (auto & view : views) - view.out->flush(); + return result_chain; } static void process(Block & block, ViewRuntimeData & view) @@ -729,24 +541,9 @@ static void process(Block & block, ViewRuntimeData & view) void ExecutingInnerQueryFromViewTransform::transform(Chunk & chunk) { - runViewStage(view, "while pushing to view", [&] - { - auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); - process(block, view); - chunk.setColumns(block.getColumns(), block.rows()); - }); -} - -void PushingToViewsBlockOutputStream::checkExceptionsInViews() -{ - for (auto & view : views) - { - if (view.exception) - { - logQueryViews(); - std::rethrow_exception(view.exception); - } - } + auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); + process(block, view); + chunk.setColumns(block.getColumns(), block.rows()); } static void logQueryViews(std::list & views, ContextPtr context) diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index e7f800406d7..dfa7a9593d7 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -3,10 +3,11 @@ #include #include #include -#include -#include -#include +#include #include +#include +#include +#include namespace Poco { @@ -18,6 +19,9 @@ namespace DB class ReplicatedMergeTreeSink; +struct ExceptionKeepingTransformRuntimeData; +using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr; + struct ViewRuntimeData { const ASTPtr query; @@ -41,49 +45,22 @@ struct ViewRuntimeData /** Writes data to the specified table and to all dependent materialized views. */ -class PushingToViewsBlockOutputStream : public IBlockOutputStream, WithContext -{ -public: - PushingToViewsBlockOutputStream( - const StoragePtr & storage_, - const StorageMetadataPtr & metadata_snapshot_, - ContextPtr context_, - const ASTPtr & query_ptr_, - bool no_destination = false); +Chain buildPushingToViewsDrain( + const StoragePtr & storage, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + const ASTPtr & query_ptr, + bool no_destination, + std::vector & locks, + ExceptionKeepingTransformRuntimeDataPtr runtime_data); - Block getHeader() const override; - void write(const Block & block) override; - - void flush() override; - void writePrefix() override; - void writeSuffix() override; - void onProgress(const Progress & progress) override; - -private: - StoragePtr storage; - StorageMetadataPtr metadata_snapshot; - BlockOutputStreamPtr output; - ReplicatedMergeTreeSink * replicated_output = nullptr; - Poco::Logger * log; - - ASTPtr query_ptr; - Stopwatch main_watch; - - std::vector views; - ContextMutablePtr select_context; - ContextMutablePtr insert_context; - - void process(const Block & block, ViewRuntimeData & view); - void checkExceptionsInViews(); - void logQueryViews(); -}; class ExecutingInnerQueryFromViewTransform final : public ExceptionKeepingTransform { public: ExecutingInnerQueryFromViewTransform(const Block & header, ViewRuntimeData & view_data) : ExceptionKeepingTransform(header, view_data.sample_block) - , view(std::move(view_data)) + , view(view_data) { } @@ -91,7 +68,6 @@ public: protected: void transform(Chunk & chunk) override; - void onFinish() override; private: ViewRuntimeData & view; diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 3589176f231..b150b959f87 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -19,7 +19,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -28,6 +30,7 @@ #include #include #include +#include #include #include @@ -43,12 +46,14 @@ namespace ErrorCodes } InterpreterInsertQuery::InterpreterInsertQuery( - const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_) + const ASTPtr & query_ptr_, ContextPtr context_, bool allow_materialized_, bool no_squash_, bool no_destination_, + ExceptionKeepingTransformRuntimeDataPtr runtime_data_) : WithContext(context_) , query_ptr(query_ptr_) , allow_materialized(allow_materialized_) , no_squash(no_squash_) , no_destination(no_destination_) + , runtime_data(runtime_data_) { checkStackSize(); } @@ -174,7 +179,7 @@ BlockIO InterpreterInsertQuery::execute() } } - BlockOutputStreams out_streams; + std::vector out_chains; if (!is_distributed_insert_select || query.watch) { size_t out_streams_size = 1; @@ -266,28 +271,45 @@ BlockIO InterpreterInsertQuery::execute() for (size_t i = 0; i < out_streams_size; i++) { /// We create a pipeline of several streams, into which we will write data. - BlockOutputStreamPtr out; + Chain out; /// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage. /// Otherwise we'll get duplicates when MV reads same rows again from Kafka. if (table->noPushingToViews() && !no_destination) - out = std::make_shared(table->write(query_ptr, metadata_snapshot, getContext())); + { + auto sink = table->write(query_ptr, metadata_snapshot, getContext()); + sink->setRuntimeData(runtime_data); + out.addSource(std::move(sink)); + } else - out = std::make_shared(table, metadata_snapshot, getContext(), query_ptr, no_destination); + { + std::vector locks; + out = buildPushingToViewsDrain(table, metadata_snapshot, getContext(), query_ptr, no_destination, locks, runtime_data); + for (auto & lock : locks) + res.pipeline.addTableLock(std::move(lock)); + } /// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order. /// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns. if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty()) - out = std::make_shared( - query.table_id, out, out->getHeader(), metadata_snapshot->getConstraints(), getContext()); + out.addSource(std::make_shared( + query.table_id, out.getInputHeader(), metadata_snapshot->getConstraints(), getContext())); bool null_as_default = query.select && getContext()->getSettingsRef().insert_null_as_default; + auto adding_missing_defaults_dag = addMissingDefaults( + query_sample_block, + out.getInputHeader().getNamesAndTypesList(), + metadata_snapshot->getColumns(), + getContext(), + null_as_default); + + auto adding_missing_defaults_actions = std::make_shared(adding_missing_defaults_dag); + /// Actually we don't know structure of input blocks from query/table, /// because some clients break insertion protocol (columns != header) - out = std::make_shared( - out, query_sample_block, metadata_snapshot->getColumns(), getContext(), null_as_default); + out.addSource(std::make_shared(query_sample_block, adding_missing_defaults_actions)); /// It's important to squash blocks as early as possible (before other transforms), /// because other transforms may work inefficient if block size is small. @@ -298,16 +320,17 @@ BlockIO InterpreterInsertQuery::execute() { bool table_prefers_large_blocks = table->prefersLargeBlocks(); - out = std::make_shared( - out, - out->getHeader(), + out.addSource(std::make_shared( + out.getInputHeader(), table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, - table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0); + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0)); } - auto out_wrapper = std::make_shared(out); - out_wrapper->setProcessListElement(getContext()->getProcessListElement()); - out_streams.emplace_back(std::move(out_wrapper)); + auto counting = std::make_shared(out.getInputHeader()); + counting->setProcessListElement(getContext()->getProcessListElement()); + out.addSource(std::move(counting)); + + out_chains.emplace_back(std::move(out)); } } @@ -318,7 +341,7 @@ BlockIO InterpreterInsertQuery::execute() } else if (query.select || query.watch) { - const auto & header = out_streams.at(0)->getHeader(); + const auto & header = out_chains.at(0).getInputHeader(); auto actions_dag = ActionsDAG::makeConvertingActions( res.pipeline.getHeader().getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), @@ -330,15 +353,11 @@ BlockIO InterpreterInsertQuery::execute() return std::make_shared(in_header, actions); }); - res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr + res.pipeline.addChains(std::move(out_chains)); + + res.pipeline.setSinks([&](const Block & cur_header, QueryPipeline::StreamType) -> ProcessorPtr { - if (type != QueryPipeline::StreamType::Main) - return nullptr; - - auto stream = std::move(out_streams.back()); - out_streams.pop_back(); - - return std::make_shared(std::move(stream)); + return std::make_shared(cur_header); }); if (!allow_materialized) @@ -353,13 +372,14 @@ BlockIO InterpreterInsertQuery::execute() auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr); res.pipeline.init(std::move(pipe)); res.pipeline.resize(1); - res.pipeline.setSinks([&](const Block &, Pipe::StreamType) + res.pipeline.addChains(std::move(out_chains)); + res.pipeline.setSinks([&](const Block & cur_header, Pipe::StreamType) { - return std::make_shared(out_streams.at(0)); + return std::make_shared(cur_header); }); } else - res.out = std::move(out_streams.at(0)); + res.out = std::move(out_chains.at(0)); res.pipeline.addStorageHolder(table); if (const auto * mv = dynamic_cast(table.get())) diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 71b8c827702..380d7550ed8 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -9,6 +9,9 @@ namespace DB { +struct ExceptionKeepingTransformRuntimeData; +using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr; + /** Interprets the INSERT query. */ class InterpreterInsertQuery : public IInterpreter, WithContext @@ -19,7 +22,8 @@ public: ContextPtr context_, bool allow_materialized_ = false, bool no_squash_ = false, - bool no_destination_ = false); + bool no_destination_ = false, + ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr); /** Prepare a request for execution. Return block streams * - the stream into which you can write data to execute the query, if INSERT; @@ -40,6 +44,7 @@ private: const bool allow_materialized; const bool no_squash; const bool no_destination; + ExceptionKeepingTransformRuntimeDataPtr runtime_data; }; diff --git a/src/Processors/Chain.cpp b/src/Processors/Chain.cpp new file mode 100644 index 00000000000..6f0320c86ac --- /dev/null +++ b/src/Processors/Chain.cpp @@ -0,0 +1,108 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +static void checkSingleInput(const IProcessor & transform) +{ + if (transform.getInputs().size() != 1) + throw Exception("Transform for chain should have single input, " + "but " + transform.getName() + " has " + + toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR); + + if (transform.getInputs().front().isConnected()) + throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR); +} + +static void checkSingleOutput(const IProcessor & transform) +{ + if (transform.getOutputs().size() != 1) + throw Exception("Transform for chain should have single output, " + "but " + transform.getName() + " has " + + toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR); + + if (transform.getOutputs().front().isConnected()) + throw Exception("Transform for chain has connected input.", ErrorCodes::LOGICAL_ERROR); +} + +static void checkTransform(const IProcessor & transform) +{ + checkSingleInput(transform); + checkSingleOutput(transform); +} + +static void checkInitialized(const std::list & processors) +{ + if (processors.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized"); +} + +Chain::Chain(ProcessorPtr processor) +{ + checkTransform(*processor); + processors.emplace_back(std::move(processor)); +} + +Chain::Chain(std::list processors_) : processors(std::move(processors_)) +{ + if (processors.empty()) + return; + + checkSingleInput(*processors.front()); + checkSingleOutput(*processors.back()); + + for (const auto & processor : processors) + { + for (const auto & input : processor->getInputs()) + if (&input != &getInputPort() && !input.isConnected()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot initialize chain because there is a not connected input for {}", + processor->getName()); + + for (const auto & output : processor->getOutputs()) + if (&output != &getOutputPort() && !output.isConnected()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot initialize chain because there is a not connected output for {}", + processor->getName()); + } +} + +void Chain::addSource(ProcessorPtr processor) +{ + checkTransform(*processor); + + if (!processors.empty()) + connect(processor->getOutputs().front(), getInputPort()); + + processors.emplace_front(std::move(processor)); +} + +void Chain::addSink(ProcessorPtr processor) +{ + checkTransform(*processor); + + if (!processors.empty()) + connect(getOutputPort(), processor->getInputs().front()); + + processors.emplace_front(std::move(processor)); +} + +InputPort & Chain::getInputPort() const +{ + checkInitialized(processors); + return processors.front()->getInputs().front(); +} + +OutputPort & Chain::getOutputPort() const +{ + checkInitialized(processors); + return processors.back()->getOutputs().front(); +} + +} diff --git a/src/Processors/Chain.h b/src/Processors/Chain.h new file mode 100644 index 00000000000..459fd9fbc7e --- /dev/null +++ b/src/Processors/Chain.h @@ -0,0 +1,41 @@ +#pragma once + +#include + +namespace DB +{ + +class Chain +{ +public: + Chain() = default; + Chain(Chain &&) = default; + Chain(const Chain &) = delete; + + Chain & operator=(Chain &&) = default; + Chain & operator=(const Chain &) = delete; + + explicit Chain(ProcessorPtr processor); + explicit Chain(std::list processors); + + bool empty() const { return processors.empty(); } + + void addSource(ProcessorPtr processor); + void addSink(ProcessorPtr processor); + + InputPort & getInputPort() const; + OutputPort & getOutputPort() const; + + const Block & getInputHeader() const { return getInputPort().getHeader(); } + const Block & getOutputHeader() const { return getOutputPort().getHeader(); } + + static std::list getProcessors(Chain chain) { return std::move(chain.processors); } + +private: + /// -> source -> transform -> ... -> transform -> sink -> + /// ^ -> -> -> -> ^ + /// input port output port + std::list processors; +}; + +} diff --git a/src/Processors/Drain.cpp b/src/Processors/Drain.cpp deleted file mode 100644 index a6a010f0a42..00000000000 --- a/src/Processors/Drain.cpp +++ /dev/null @@ -1,75 +0,0 @@ -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -static void checkSink(const IProcessor & sink) -{ - if (!sink.getOutputs().empty()) - throw Exception("Sink for drain shouldn't have any output, but " + sink.getName() + " has " + - toString(sink.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR); - - if (sink.getInputs().empty()) - throw Exception("Sink for drain should have single input, but it doesn't have any", - ErrorCodes::LOGICAL_ERROR); - - if (sink.getInputs().size() > 1) - throw Exception("Sink for drain should have single input, but " + sink.getName() + " has " + - toString(sink.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR); - - if (sink.getInputs().front().isConnected()) - throw Exception("Sink for drain has connected input.", ErrorCodes::LOGICAL_ERROR); -} - -static void checkTransform(const IProcessor & transform) -{ - if (transform.getInputs().size() != 1) - throw Exception("Transform for drain should have single input, " - "but " + transform.getName() + " has " + - toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR); - - if (transform.getOutputs().size() != 1) - throw Exception("Transform for drain should have single output, " - "but " + transform.getName() + " has " + - toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR); - - if (transform.getInputs().front().isConnected()) - throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR); - - if (transform.getOutputs().front().isConnected()) - throw Exception("Transform for drain has connected input.", ErrorCodes::LOGICAL_ERROR); -} - -void checkInitialized(const Processors & processors) -{ - if (processors.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Drain is not initialized"); -} - -Drain::Drain(ProcessorPtr processor) -{ - checkSink(*processor); - processors.emplace_back(std::move(processor)); -} - -void Drain::addTransform(ProcessorPtr processor) -{ - checkInitialized(processors); - checkTransform(*processor); - connect(processor->getOutputs().front(), processors.back()->getInputs().front()); - processors.emplace_back(std::move(processor)); -} - -InputPort & Drain::getPort() const -{ - checkInitialized(processors); - return processors.back()->getInputs().front(); -} - -} diff --git a/src/Processors/Drain.h b/src/Processors/Drain.h deleted file mode 100644 index 866d8a0fc52..00000000000 --- a/src/Processors/Drain.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -class Drain -{ -public: - Drain() = default; - explicit Drain(ProcessorPtr processor); - - void addTransform(ProcessorPtr processor); - - InputPort & getPort() const; - const Block & getHeader() const { return getPort().getHeader(); } - -private: - Processors processors; -}; - -} diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index e0da79f148d..d2aecd10f3f 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -667,6 +667,41 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter) addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); }); } +void Pipe::addChains(std::vector chains) +{ + if (output_ports.size() != chains.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot add chains to Pipe because " + "number of output ports ({}) is not equal to the number of chains ({})", + output_ports.size(), chains.size()); + + dropTotals(); + dropExtremes(); + + Block new_header; + for (size_t i = 0; i < output_ports.size(); ++i) + { + if (i == 0) + new_header = chains[i].getOutputHeader(); + else + assertBlocksHaveEqualStructure(new_header, chains[i].getOutputHeader(), "QueryPipeline"); + + connect(*output_ports[i], chains[i].getInputPort()); + output_ports[i] = &chains[i].getOutputPort(); + + auto added_processors = Chain::getProcessors(std::move(chains[i])); + for (auto & transform : added_processors) + { + if (collected_processors) + collected_processors->emplace_back(transform); + + processors.emplace_back(std::move(transform)); + } + } + + header = std::move(new_header); +} + void Pipe::resize(size_t num_streams, bool force, bool strict) { if (output_ports.empty()) diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index dc3be3289fc..0b430631d71 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -86,6 +87,9 @@ public: void addSimpleTransform(const ProcessorGetter & getter); void addSimpleTransform(const ProcessorGetterWithStreamKind & getter); + /// Add chain to every output port. + void addChains(std::vector chains); + /// Changes the number of output ports if needed. Adds ResizeTransform. void resize(size_t num_streams, bool force = false, bool strict = false); diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 2b882ee93ab..c03aacf2e1b 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -103,6 +103,12 @@ void QueryPipeline::addTransform(ProcessorPtr transform, InputPort * totals, Inp pipe.addTransform(std::move(transform), totals, extremes); } +void QueryPipeline::addChains(std::vector chains) +{ + checkInitializedAndNotCompleted(); + pipe.addChains(std::move(chains)); +} + void QueryPipeline::transform(const Transformer & transformer) { checkInitializedAndNotCompleted(); diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 358d31a6dff..d382bf21ebe 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -56,6 +56,8 @@ public: void addTransform(ProcessorPtr transform); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); + void addChains(std::vector chains); + using Transformer = std::function; /// Transform pipeline in general way. void transform(const Transformer & transformer); diff --git a/src/Processors/Sinks/SinkToStorage.cpp b/src/Processors/Sinks/SinkToStorage.cpp index 8bf5fdf3fd2..7fb6f3d6248 100644 --- a/src/Processors/Sinks/SinkToStorage.cpp +++ b/src/Processors/Sinks/SinkToStorage.cpp @@ -6,6 +6,13 @@ namespace DB { +ExceptionKeepingTransformRuntimeData::ExceptionKeepingTransformRuntimeData( + std::unique_ptr thread_status_, + std::string additional_exception_message_) + : thread_status(std::move(thread_status_)) + , additional_exception_message(std::move(additional_exception_message_)) +{ +} ExceptionKeepingTransform::ExceptionKeepingTransform(const Block & in_header, const Block & out_header) : IProcessor({in_header}, {out_header}) @@ -68,7 +75,7 @@ IProcessor::Status ExceptionKeepingTransform::prepare() return Status::Ready; } -static std::exception_ptr runStep(std::function step, ExceptionKeepingTransform::RuntimeData * runtime_data) +static std::exception_ptr runStep(std::function step, ExceptionKeepingTransformRuntimeData * runtime_data) { auto * original_thread = current_thread; SCOPE_EXIT({ current_thread = original_thread; }); diff --git a/src/Processors/Sinks/SinkToStorage.h b/src/Processors/Sinks/SinkToStorage.h index d66391833ec..380696ed059 100644 --- a/src/Processors/Sinks/SinkToStorage.h +++ b/src/Processors/Sinks/SinkToStorage.h @@ -7,6 +7,20 @@ namespace DB class ThreadStatus; + +struct ExceptionKeepingTransformRuntimeData +{ + std::unique_ptr thread_status = nullptr; + UInt64 elapsed_ms = 0; + std::string additional_exception_message; + + ExceptionKeepingTransformRuntimeData( + std::unique_ptr thread_status_, + std::string additional_exception_message_); +}; + +using ExceptionKeepingTransformRuntimeDataPtr = std::shared_ptr; + /// Has one input and one output. /// Works similarly to ISimpleTransform, but with much care about exceptions. /// @@ -48,19 +62,10 @@ public: InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; } - struct RuntimeData - { - std::unique_ptr thread_status = nullptr; - UInt64 elapsed_ms = 0; - std::string additional_exception_message; - }; - - using RuntimeDataPtr = std::shared_ptr; - - void setRuntimeData(RuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); } + void setRuntimeData(ExceptionKeepingTransformRuntimeDataPtr runtime_data_) { runtime_data = std::move(runtime_data_); } private: - RuntimeDataPtr runtime_data; + ExceptionKeepingTransformRuntimeDataPtr runtime_data; };