mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
Merge pull request #17640 from ClickHouse/fix_ddl_worker_is_circular
Fix race on DDLWorker::is_circular_replicated
This commit is contained in:
commit
a908b2d653
@ -186,6 +186,7 @@ struct DDLTask
|
|||||||
Cluster::Address address_in_cluster;
|
Cluster::Address address_in_cluster;
|
||||||
size_t host_shard_num;
|
size_t host_shard_num;
|
||||||
size_t host_replica_num;
|
size_t host_replica_num;
|
||||||
|
bool is_circular_replicated = false;
|
||||||
|
|
||||||
/// Stage 3.3: execute query
|
/// Stage 3.3: execute query
|
||||||
ExecutionStatus execution_status;
|
ExecutionStatus execution_status;
|
||||||
@ -594,7 +595,7 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
|
|||||||
* To distinguish one replica from another on the same node,
|
* To distinguish one replica from another on the same node,
|
||||||
* every shard is placed into separate database.
|
* every shard is placed into separate database.
|
||||||
* */
|
* */
|
||||||
is_circular_replicated = true;
|
task.is_circular_replicated = true;
|
||||||
auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(task.query.get());
|
auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(task.query.get());
|
||||||
if (!query_with_table || query_with_table->database.empty())
|
if (!query_with_table || query_with_table->database.empty())
|
||||||
{
|
{
|
||||||
@ -770,7 +771,6 @@ void DDLWorker::processTask(DDLTask & task)
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
is_circular_replicated = false;
|
|
||||||
parseQueryAndResolveHost(task);
|
parseQueryAndResolveHost(task);
|
||||||
|
|
||||||
ASTPtr rewritten_ast = task.query_on_cluster->getRewrittenASTWithoutOnCluster(task.address_in_cluster.default_database);
|
ASTPtr rewritten_ast = task.query_on_cluster->getRewrittenASTWithoutOnCluster(task.address_in_cluster.default_database);
|
||||||
@ -787,7 +787,7 @@ void DDLWorker::processTask(DDLTask & task)
|
|||||||
storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (storage && taskShouldBeExecutedOnLeader(rewritten_ast, storage) && !is_circular_replicated)
|
if (storage && taskShouldBeExecutedOnLeader(rewritten_ast, storage) && !task.is_circular_replicated)
|
||||||
tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper);
|
tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper);
|
||||||
else
|
else
|
||||||
tryExecuteQuery(rewritten_query, task, task.execution_status);
|
tryExecuteQuery(rewritten_query, task, task.execution_status);
|
||||||
|
@ -104,7 +104,6 @@ private:
|
|||||||
void attachToThreadGroup();
|
void attachToThreadGroup();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::atomic<bool> is_circular_replicated = false;
|
|
||||||
Context context;
|
Context context;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user