correct column list for rewritten INSERT query into Distributed [#CLICKHOUSE-4161]

This commit is contained in:
Alexey Zatelepin 2018-11-23 20:39:16 +03:00
parent ebfdd0e7f9
commit 01501fa8db

View File

@ -23,6 +23,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
@ -75,25 +76,22 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
return modified_query_ast;
}
/// insert query has database and table names as bare strings
/// If the query is null, it creates a insert query with the database and tables
/// Or it creates a copy of query, changes the database and table names.
ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table)
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
/// the sample block instead.
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
{
ASTPtr modified_query_ast = nullptr;
if (query == nullptr)
modified_query_ast = std::make_shared<ASTInsertQuery>();
else
modified_query_ast = query->clone();
auto query = std::make_shared<ASTInsertQuery>();
query->database = database;
query->table = table;
auto & actual_query = typeid_cast<ASTInsertQuery &>(*modified_query_ast);
actual_query.database = database;
actual_query.table = table;
actual_query.table_function = nullptr;
/// make sure query is not INSERT SELECT
actual_query.select = nullptr;
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
query->children.push_back(columns);
for (const auto & col : sample_block)
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
return modified_query_ast;
return query;
}
/// Calculate maximum number in file names in directory and all subdirectories.
@ -274,7 +272,7 @@ BlockInputStreams StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Settings & settings)
{
auto cluster = getCluster();
@ -298,7 +296,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Setti
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, settings, insert_sync, timeout);
*this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlock()), cluster, settings, insert_sync, timeout);
}