Restore InterpreterInsertQuery

This commit is contained in:
Nikolai Kochetov 2021-09-17 15:59:40 +03:00
parent a8c3b02598
commit 618d4d863e
2 changed files with 22 additions and 63 deletions

View File

@ -54,6 +54,7 @@ InterpreterInsertQuery::InterpreterInsertQuery(
checkStackSize(); checkStackSize();
} }
StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query) StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
{ {
if (query.table_function) if (query.table_function)
@ -238,20 +239,26 @@ Chain InterpreterInsertQuery::buildChainImpl(
return out; return out;
} }
std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl( BlockIO InterpreterInsertQuery::execute()
const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, Block & sample_block)
{ {
const auto & settings = getContext()->getSettingsRef(); const Settings & settings = getContext()->getSettingsRef();
const auto & query = query_ptr->as<const ASTInsertQuery &>(); auto & query = query_ptr->as<ASTInsertQuery &>();
QueryPipelineBuilder pipeline;
StoragePtr table = getTable(query);
if (query.partition_by && !table->supportsPartitionBy()) if (query.partition_by && !table->supportsPartitionBy())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PARTITION BY clause is not supported by storage");
BlockIO res; auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
QueryPipelineBuilder pipeline; auto metadata_snapshot = table->getInMemoryMetadataPtr();
std::vector<Chain> out_chains;
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
bool is_distributed_insert_select = false; bool is_distributed_insert_select = false;
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select) if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
{ {
// Distributed INSERT SELECT // Distributed INSERT SELECT
@ -262,6 +269,7 @@ std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl(
} }
} }
std::vector<Chain> out_chains;
if (!is_distributed_insert_select || query.watch) if (!is_distributed_insert_select || query.watch)
{ {
size_t out_streams_size = 1; size_t out_streams_size = 1;
@ -330,7 +338,7 @@ std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl(
if (getContext()->getSettingsRef().insert_null_as_default) if (getContext()->getSettingsRef().insert_null_as_default)
{ {
const auto & input_columns = pipeline.getHeader().getColumnsWithTypeAndName(); const auto & input_columns = pipeline.getHeader().getColumnsWithTypeAndName();
const auto & query_columns = sample_block.getColumnsWithTypeAndName(); const auto & query_columns = query_sample_block.getColumnsWithTypeAndName();
const auto & output_columns = metadata_snapshot->getColumns(); const auto & output_columns = metadata_snapshot->getColumns();
if (input_columns.size() == query_columns.size()) if (input_columns.size() == query_columns.size())
@ -340,7 +348,7 @@ std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl(
/// Change query sample block columns to Nullable to allow inserting nullable columns, where NULL values will be substituted with /// Change query sample block columns to Nullable to allow inserting nullable columns, where NULL values will be substituted with
/// default column values (in AddingDefaultBlockOutputStream), so all values will be cast correctly. /// default column values (in AddingDefaultBlockOutputStream), so all values will be cast correctly.
if (input_columns[col_idx].type->isNullable() && !query_columns[col_idx].type->isNullable() && output_columns.hasDefault(query_columns[col_idx].name)) if (input_columns[col_idx].type->isNullable() && !query_columns[col_idx].type->isNullable() && output_columns.hasDefault(query_columns[col_idx].name))
sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name)); query_sample_block.setColumn(col_idx, ColumnWithTypeAndName(makeNullable(query_columns[col_idx].column), makeNullable(query_columns[col_idx].type), query_columns[col_idx].name));
} }
} }
} }
@ -367,28 +375,8 @@ std::pair<BlockIO, BlockOutputStreams> InterpreterInsertQuery::executeImpl(
BlockIO res; BlockIO res;
return {std::move(res), std::move(out_streams)};
}
BlockIO InterpreterInsertQuery::execute()
{
const auto & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
auto table = getTable(query);
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
BlockIO res;
BlockOutputStreams out_streams;
std::tie(res, out_streams) = executeImpl(table, metadata_snapshot, sample_block);
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH? /// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (out_streams.empty()) if (is_distributed_insert_select)
{ {
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline)); res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
} }
@ -436,7 +424,7 @@ BlockIO InterpreterInsertQuery::execute()
if (query.hasInlinedData()) if (query.hasInlinedData())
{ {
/// can execute without additional data /// can execute without additional data
auto pipe = getSourceFromFromASTInsertQuery(query_ptr, nullptr, query_sample_block, getContext(), nullptr); auto pipe = getSourceFromASTInsertQuery(query_ptr, false, query_sample_block, getContext(), nullptr);
res.pipeline.complete(std::move(pipe)); res.pipeline.complete(std::move(pipe));
} }
} }
@ -444,28 +432,6 @@ BlockIO InterpreterInsertQuery::execute()
return res; return res;
} }
Processors InterpreterInsertQuery::getSinks()
{
const auto & settings = getContext()->getSettingsRef();
auto & query = query_ptr->as<ASTInsertQuery &>();
auto table = getTable(query);
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, sample_block.getNames());
auto out_streams = executeImpl(table, metadata_snapshot, sample_block).second;
Processors sinks;
sinks.reserve(out_streams.size());
for (const auto & out : out_streams)
sinks.emplace_back(std::make_shared<SinkToOutputStream>(out));
return sinks;
}
StorageID InterpreterInsertQuery::getDatabaseTable() const StorageID InterpreterInsertQuery::getDatabaseTable() const
{ {

View File

@ -5,7 +5,6 @@
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Storages/StorageInMemoryMetadata.h> #include <Storages/StorageInMemoryMetadata.h>
#include <IO/ReadBuffer.h>
namespace DB namespace DB
{ {
@ -33,10 +32,6 @@ public:
* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result). * Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).
*/ */
BlockIO execute() override; BlockIO execute() override;
Chain buildChain();
/// Returns only sinks, without input sources.
Processors getSinks();
StorageID getDatabaseTable() const; StorageID getDatabaseTable() const;
@ -47,13 +42,11 @@ public:
ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr); ExceptionKeepingTransformRuntimeDataPtr runtime_data = nullptr);
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override; void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
StoragePtr getTable(ASTInsertQuery & query);
private: private:
std::pair<BlockIO, BlockOutputStreams> executeImpl( StoragePtr getTable(ASTInsertQuery & query);
const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, Block & sample_block); Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
ASTPtr query_ptr; ASTPtr query_ptr;
const bool allow_materialized; const bool allow_materialized;