Better way to distinguish between user query and replication thread query

This commit is contained in:
kssenii 2021-05-16 21:35:37 +00:00
parent 8a4711b716
commit 1ee77eae89
4 changed files with 12 additions and 14 deletions

View File

@ -118,8 +118,7 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte
/// to its nested ReplacingMergeTree table (in all other cases), the context of a query os modified.
/// Also if materialzied_tables set is empty - it means all access is done to ReplacingMergeTree tables - it is a case after
/// replication_handler was shutdown.
if ((local_context->hasQueryContext() && local_context->getQueryContext()->getQueryFactoriesInfo().storages.count("ReplacingMergeTree"))
|| materialized_tables.empty())
if (local_context->isInternalQuery() || materialized_tables.empty())
{
return DatabaseAtomic::tryGetTable(name, local_context);
}
@ -143,14 +142,10 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte
void DatabaseMaterializePostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
{
/// Create table query can only be called from replication thread.
if (local_context->hasQueryContext())
if (local_context->isInternalQuery())
{
auto storage_set = local_context->getQueryContext()->getQueryFactoriesInfo().storages;
if (storage_set.find("ReplacingMergeTree") != storage_set.end())
{
DatabaseAtomic::createTable(local_context, table_name, table, query);
return;
}
DatabaseAtomic::createTable(local_context, table_name, table, query);
return;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,

View File

@ -259,6 +259,9 @@ private:
/// XXX: move this stuff to shared part instead.
ContextPtr buffer_context; /// Buffer context. Could be equal to this.
/// A flag, used to distinquish between user query and internal query to a database engine (MaterializePostgreSQL).
bool is_internal_query = false;
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
@ -728,6 +731,9 @@ public:
void shutdown();
bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }
ActionLocksManagerPtr getActionLocksManager();
enum class ApplicationType

View File

@ -475,8 +475,7 @@ void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransact
insert->columns = buffer.columnsAST;
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
insert_context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
insert_context->setInternalQuery(true);
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto block_io = interpreter.execute();

View File

@ -173,9 +173,7 @@ void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructure
std::shared_ptr<Context> StorageMaterializePostgreSQL::makeNestedTableContext(ContextPtr from_context)
{
auto new_context = Context::createCopy(from_context);
new_context->makeQueryContext();
new_context->addQueryFactoriesInfo(Context::QueryLogFactories::Storage, "ReplacingMergeTree");
new_context->setInternalQuery(true);
return new_context;
}