Merge pull request #8234 from ClickHouse/fix_insert_select_mysql

Fix insert select from mysql(...)
This commit is contained in:
alexey-milovidov 2019-12-16 20:42:02 +03:00 committed by GitHub
commit dd58ddbc9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 10 deletions

View File

@ -99,9 +99,7 @@ BlockIO InterpreterInsertQuery::execute()
const auto & query = query_ptr->as<ASTInsertQuery &>();
checkAccess(query);
BlockIO res;
StoragePtr table = getTable(query);
res.pipeline.addStorageHolder(table);
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
@ -137,7 +135,7 @@ BlockIO InterpreterInsertQuery::execute()
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);
res.out = std::move(out);
BlockIO res;
/// What type of query: INSERT or INSERT SELECT?
if (query.select)
@ -145,13 +143,13 @@ BlockIO InterpreterInsertQuery::execute()
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.in = interpreter_select.execute().in;
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, res.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
/// BlockIO may hold StoragePtrs to temporary tables
res = interpreter_select.execute();
res.out = nullptr;
res.in = std::make_shared<ConvertingBlockInputStream>(context, res.in, out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
if (!allow_materialized)
{
Block in_header = res.in->getHeader();
@ -163,9 +161,12 @@ BlockIO InterpreterInsertQuery::execute()
else if (query.data && !query.has_tail) /// can execute without additional data
{
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context, nullptr);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
}
else
res.out = std::move(out);
res.pipeline.addStorageHolder(table);
return res;
}

View File

@ -109,6 +109,8 @@ def test_table_function(started_cluster):
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 2)".format(table_function, table_function,
table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '30000'
node1.query("INSERT INTO {} SELECT id + 100000, name, age, money FROM {}".format('TABLE FUNCTION ' + table_function, table_function))
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '60000'
conn.close()