diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 5b58798693f..d1f1abb6871 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -54,6 +54,7 @@ InterpreterInsertQuery::InterpreterInsertQuery( checkStackSize(); } + StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query) { if (query.table_function) @@ -238,20 +239,26 @@ Chain InterpreterInsertQuery::buildChainImpl( return out; } -std::pair InterpreterInsertQuery::executeImpl( - const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, Block & sample_block) +BlockIO InterpreterInsertQuery::execute() { - const auto & settings = getContext()->getSettingsRef(); - const auto & query = query_ptr->as(); + const Settings & settings = getContext()->getSettingsRef(); + auto & query = query_ptr->as(); + QueryPipelineBuilder pipeline; + + StoragePtr table = getTable(query); if (query.partition_by && !table->supportsPartitionBy()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage"); - BlockIO res; - QueryPipelineBuilder pipeline; - std::vector out_chains; + auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout); + auto metadata_snapshot = table->getInMemoryMetadataPtr(); + + auto query_sample_block = getSampleBlock(query, table, metadata_snapshot); + if (!query.table_function) + getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames()); bool is_distributed_insert_select = false; + if (query.select && table->isRemote() && settings.parallel_distributed_insert_select) { // Distributed INSERT SELECT @@ -262,6 +269,7 @@ std::pair InterpreterInsertQuery::executeImpl( } } + std::vector out_chains; if (!is_distributed_insert_select || query.watch) { size_t out_streams_size = 1; @@ -330,7 +338,7 @@ std::pair InterpreterInsertQuery::executeImpl( if (getContext()->getSettingsRef().insert_null_as_default) { const auto & input_columns = pipeline.getHeader().getColumnsWithTypeAndName(); - const auto & query_columns = sample_block.getColumnsWithTypeAndName(); + const auto & query_columns = query_sample_block.getColumnsWithTypeAndName(); const auto & output_columns = metadata_snapshot->getColumns(); if (input_columns.size() == query_columns.size()) @@ -340,7 +348,7 @@ std::pair InterpreterInsertQuery::executeImpl( /// Change query sample block columns to Nullable to allow inserting nullable columns, where NULL values will be substituted with /// default column values (in AddingDefaultBlockOutputStream), so all values will be cast correctly. if (input_columns[col_idx].type->isNullable() && !query_columns[col_idx].type->isNullable() && output_columns.hasDefault(query_columns[col_idx].name)) - sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name)); + query_sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name)); } } } @@ -367,28 +375,8 @@ std::pair InterpreterInsertQuery::executeImpl( BlockIO res; - return {std::move(res), std::move(out_streams)}; -} - -BlockIO InterpreterInsertQuery::execute() -{ - const auto & settings = getContext()->getSettingsRef(); - auto & query = query_ptr->as(); - - auto table = getTable(query); - auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout); - auto metadata_snapshot = table->getInMemoryMetadataPtr(); - auto sample_block = getSampleBlock(query, table, metadata_snapshot); - - if (!query.table_function) - getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); - - BlockIO res; - BlockOutputStreams out_streams; - std::tie(res, out_streams) = executeImpl(table, metadata_snapshot, sample_block); - /// What type of query: INSERT or INSERT SELECT or INSERT WATCH? - if (out_streams.empty()) + if (is_distributed_insert_select) { res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline)); } @@ -436,7 +424,7 @@ BlockIO InterpreterInsertQuery::execute() if (query.hasInlinedData()) { /// can execute without additional data - auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr); + auto pipe = getSourceFromASTInsertQuery(query_ptr, false, query_sample_block, getContext(), nullptr); res.pipeline.complete(std::move(pipe)); } } @@ -444,28 +432,6 @@ BlockIO InterpreterInsertQuery::execute() return res; } -Processors InterpreterInsertQuery::getSinks() -{ - const auto & settings = getContext()->getSettingsRef(); - auto & query = query_ptr->as(); - - auto table = getTable(query); - auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout); - auto metadata_snapshot = table->getInMemoryMetadataPtr(); - auto sample_block = getSampleBlock(query, table, metadata_snapshot); - - if (!query.table_function) - getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames()); - - auto out_streams = executeImpl(table, metadata_snapshot, sample_block).second; - - Processors sinks; - sinks.reserve(out_streams.size()); - for (const auto & out : out_streams) - sinks.emplace_back(std::make_shared(out)); - - return sinks; -} StorageID InterpreterInsertQuery::getDatabaseTable() const { diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 38c15fb4898..378e80a46fe 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB { @@ -33,10 +32,6 @@ public: * Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result). */ BlockIO execute() override; - Chain buildChain(); - - /// Returns only sinks, without input sources. - Processors getSinks(); StorageID getDatabaseTable() const; @@ -47,13 +42,11 @@ public: ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr); void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override; - Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; - Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; - StoragePtr getTable(ASTInsertQuery & query); private: - std::pair executeImpl( - const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, Block & sample_block); + StoragePtr getTable(ASTInsertQuery & query); + Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; + Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; ASTPtr query_ptr; const bool allow_materialized;