Merge pull request #31392 from azat/mysql-sync-thread-fix-QueryScope

Fix QueryScope in MaterializedMySQLSyncThread
This commit is contained in:
alexey-milovidov 2021-11-14 01:03:15 +03:00 committed by GitHub
commit f1fd01a95d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -52,8 +52,6 @@ static ContextMutablePtr createQueryContext(ContextPtr context)
auto query_context = Context::createCopy(context);
query_context->setSettings(new_query_settings);
CurrentThread::QueryScope query_scope(query_context);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentQueryId(""); // generate random query_id
return query_context;
@ -273,6 +271,8 @@ static inline void cleanOutdatedTables(const String & database_name, ContextPtr
for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
auto query_context = createQueryContext(context);
CurrentThread::QueryScope query_scope(query_context);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
cleaning_table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name());
tryToExecuteQuery(" DROP TABLE " + cleaning_table_name, query_context, database_name, comment);
@ -324,6 +324,8 @@ static inline void dumpDataForTables(
{
const auto & table_name = iterator->first;
auto query_context = createQueryContext(context);
CurrentThread::QueryScope query_scope(query_context);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table.
@ -742,6 +744,8 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
try
{
auto query_context = createQueryContext(getContext());
CurrentThread::QueryScope query_scope(query_context);
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
@ -791,6 +795,8 @@ void MaterializedMySQLSyncThread::Buffers::commit(ContextPtr context)
for (auto & table_name_and_buffer : data)
{
auto query_context = createQueryContext(context);
CurrentThread::QueryScope query_scope(query_context);
auto input = std::make_shared<SourceFromSingleChunk>(table_name_and_buffer.second->first);
auto pipeline = getTableOutput(database, table_name_and_buffer.first, query_context, true);
pipeline.complete(Pipe(std::move(input)));