Recursive CTE distributed fix

This commit is contained in:
Maksim Kita 2024-05-16 12:07:01 +03:00
parent 4b44df59db
commit 7ee64e55ed
5 changed files with 106 additions and 10 deletions

View File

@ -1607,6 +1607,21 @@ Tables Context::getExternalTables() const
void Context::addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
addExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
updateExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
addOrUpdateExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::addExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
@ -1614,34 +1629,32 @@ void Context::addExternalTable(const String & table_name, TemporaryTableHolder &
std::lock_guard lock(mutex);
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Temporary table {} already exists", backQuoteIfNeed(table_name));
external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
external_tables_mapping.emplace(table_name, std::move(temporary_table));
}
void Context::updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
void Context::updateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto temporary_table_ptr = std::make_shared<TemporaryTableHolder>(std::move(temporary_table));
std::lock_guard lock(mutex);
auto it = external_tables_mapping.find(table_name);
if (it == external_tables_mapping.end())
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Temporary table {} does not exists", backQuoteIfNeed(table_name));
it->second = std::move(temporary_table_ptr);
it->second = std::move(temporary_table);
}
void Context::addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
void Context::addOrUpdateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto temporary_table_ptr = std::make_shared<TemporaryTableHolder>(std::move(temporary_table));
std::lock_guard lock(mutex);
auto [it, inserted] = external_tables_mapping.emplace(table_name, temporary_table_ptr);
auto [it, inserted] = external_tables_mapping.emplace(table_name, temporary_table);
if (!inserted)
it->second = std::move(temporary_table_ptr);
it->second = std::move(temporary_table);
}
std::shared_ptr<TemporaryTableHolder> Context::findExternalTable(const String & table_name) const

View File

@ -685,6 +685,9 @@ public:
void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void addExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
void updateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
void addOrUpdateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
std::shared_ptr<TemporaryTableHolder> findExternalTable(const String & table_name) const;
std::shared_ptr<TemporaryTableHolder> removeExternalTable(const String & table_name);

View File

@ -102,6 +102,7 @@ public:
"Recursive CTE subquery {}. Expected projection columns to have same size in recursive and non recursive subquery.",
recursive_cte_union_node->formatASTForErrorMessage());
working_temporary_table_holder = recursive_cte_table->holder;
working_temporary_table_storage = recursive_cte_table->storage;
intermediate_temporary_table_holder = std::make_shared<TemporaryTableHolder>(
@ -147,6 +148,7 @@ public:
truncateTemporaryTable(working_temporary_table_storage);
std::swap(intermediate_temporary_table_holder, working_temporary_table_holder);
std::swap(intermediate_temporary_table_storage, working_temporary_table_storage);
}
@ -172,6 +174,9 @@ private:
SelectQueryOptions select_query_options;
select_query_options.merge_tree_enable_remove_parts_from_snapshot_optimization = false;
const auto & recursive_table_name = recursive_cte_union_node->as<UnionNode &>().getCTEName();
recursive_query_context->addOrUpdateExternalTable(recursive_table_name, working_temporary_table_holder);
auto interpreter = std::make_unique<InterpreterSelectQueryAnalyzer>(query_to_execute, recursive_query_context, select_query_options);
auto pipeline_builder = interpreter->buildQueryPipeline();
@ -225,6 +230,7 @@ private:
QueryTreeNodePtr recursive_query;
ContextMutablePtr recursive_query_context;
TemporaryTableHolderPtr working_temporary_table_holder;
StoragePtr working_temporary_table_storage;
TemporaryTableHolderPtr intermediate_temporary_table_holder;

View File

@ -0,0 +1,26 @@
a [''] 0
b a ['a'] 0
c a ['a'] 0
b a ['','b'] 1
c a ['','c'] 1
--
a [''] 0
b a ['a'] 0
c a ['a'] 0
b a ['','b'] 1
c a ['','c'] 1
--
a [''] 0
a [''] 0
b a ['a'] 0
b a ['a'] 0
c a ['a'] 0
c a ['a'] 0
b a ['','b'] 1
b a ['','b'] 1
b a ['','b'] 1
b a ['','b'] 1
c a ['','c'] 1
c a ['','c'] 1
c a ['','c'] 1
c a ['','c'] 1

View File

@ -0,0 +1,48 @@
-- Tags: shard
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
id String,
parent_id String
)
ENGINE = MergeTree ORDER BY id;
INSERT INTO test_table VALUES ('a', '');
INSERT INTO test_table VALUES ('b', 'a');
INSERT INTO test_table VALUES ('c', 'a');
WITH RECURSIVE search_tree AS (
SELECT id, parent_id, [parent_id] AS path, toUInt64(0) AS depth
FROM test_table
UNION ALL
SELECT t.id, t.parent_id, arrayConcat(path, [t.id]) as path, depth + 1
FROM test_table t, search_tree st
WHERE t.parent_id = st.id)
SELECT * FROM search_tree ORDER BY depth, id, parent_id;
SELECT '--';
WITH RECURSIVE search_tree AS (
SELECT id, parent_id, [parent_id] AS path, toUInt64(0) AS depth
FROM remote('127.0.0.1', currentDatabase(), test_table)
UNION ALL
SELECT t.id, t.parent_id, arrayConcat(path, [t.id]) as path, depth + 1
FROM remote('127.0.0.1', currentDatabase(), test_table) t, search_tree st
WHERE t.parent_id = st.id)
SELECT * FROM search_tree ORDER BY depth, id, parent_id;
SELECT '--';
WITH RECURSIVE search_tree AS (
SELECT id, parent_id, [parent_id] AS path, toUInt64(0) AS depth
FROM remote('127.0.0.{1,2}', currentDatabase(), test_table)
UNION ALL
SELECT t.id, t.parent_id, arrayConcat(path, [t.id]) as path, depth + 1
FROM remote('127.0.0.{1,2}', currentDatabase(), test_table) t, search_tree st
WHERE t.parent_id = st.id)
SELECT * FROM search_tree ORDER BY depth, id, parent_id;;
DROP TABLE test_table;