Some review fixes

This commit is contained in:
kssenii 2021-12-16 00:38:46 +03:00
parent dc1f0c58fa
commit 7583c8007e
8 changed files with 55 additions and 31 deletions

View File

@ -0,0 +1,27 @@
#include "getTableOverride.h"
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTTableOverrides.h>
#include <Databases/IDatabase.h>
namespace DB
{
ASTPtr tryGetTableOverride(const String & mapped_database, const String & table)
{
if (auto database_ptr = DatabaseCatalog::instance().tryGetDatabase(mapped_database))
{
auto create_query = database_ptr->getCreateDatabaseQuery();
if (auto create_database_query = create_query->as<ASTCreateQuery>())
{
if (create_database_query->table_overrides)
{
return create_database_query->table_overrides->tryGetTableOverride(table);
}
}
}
return nullptr;
}
}

View File

@ -0,0 +1,8 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Core/Types.h>
namespace DB
{
ASTPtr tryGetTableOverride(const String & mapped_database, const String & table);
}

View File

@ -20,6 +20,7 @@
#include <Parsers/MySQL/ASTDeclareIndex.h> #include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/getTableOverride.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
@ -519,7 +520,7 @@ ASTs InterpreterCreateImpl::getRewrittenQueries(
rewritten_query->set(rewritten_query->storage, storage); rewritten_query->set(rewritten_query->storage, storage);
rewritten_query->set(rewritten_query->columns_list, columns); rewritten_query->set(rewritten_query->columns_list, columns);
if (auto table_override = ASTTableOverride::tryGetTableOverride(mapped_to_database, create_query.table)) if (auto table_override = tryGetTableOverride(mapped_to_database, create_query.table))
{ {
auto * override_ast = table_override->as<ASTTableOverride>(); auto * override_ast = table_override->as<ASTTableOverride>();
override_ast->applyToCreateTableQuery(rewritten_query.get()); override_ast->applyToCreateTableQuery(rewritten_query.get());

View File

@ -6,8 +6,6 @@
#include <Parsers/ASTIndexDeclaration.h> #include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTProjectionDeclaration.h> #include <Parsers/ASTProjectionDeclaration.h>
#include <Parsers/ASTTableOverrides.h> #include <Parsers/ASTTableOverrides.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Databases/IDatabase.h>
namespace DB namespace DB
{ {
@ -73,22 +71,6 @@ void ASTTableOverride::formatImpl(const FormatSettings & settings_, FormatState
settings.ostr << nl_or_nothing << ')'; settings.ostr << nl_or_nothing << ')';
} }
ASTPtr ASTTableOverride::tryGetTableOverride(const String & mapped_database, const String & table)
{
if (auto database_ptr = DatabaseCatalog::instance().tryGetDatabase(mapped_database))
{
auto create_query = database_ptr->getCreateDatabaseQuery();
if (auto create_database_query = create_query->as<ASTCreateQuery>())
{
if (create_database_query->table_overrides)
{
return create_database_query->table_overrides->tryGetTableOverride(table);
}
}
}
return nullptr;
}
void ASTTableOverride::applyToCreateTableQuery(ASTCreateQuery * create_query) const void ASTTableOverride::applyToCreateTableQuery(ASTCreateQuery * create_query) const
{ {
if (columns) if (columns)

View File

@ -27,8 +27,6 @@ public:
ASTPtr clone() const override; ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void applyToCreateTableQuery(ASTCreateQuery * create_query) const; void applyToCreateTableQuery(ASTCreateQuery * create_query) const;
static ASTPtr tryGetTableOverride(const String & mapped_database, const String & table);
}; };
/// List of table overrides, for example: /// List of table overrides, for example:

View File

@ -1,5 +1,7 @@
#include "PostgreSQLReplicationHandler.h" #include "PostgreSQLReplicationHandler.h"
#include <Common/setThreadName.h>
#include <Common/getTableOverride.h>
#include <Parsers/ASTTableOverrides.h> #include <Parsers/ASTTableOverrides.h>
#include <Processors/Transforms/PostgreSQLSource.h> #include <Processors/Transforms/PostgreSQLSource.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
@ -8,7 +10,6 @@
#include <Interpreters/InterpreterDropQuery.h> #include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <boost/algorithm/string/trim.hpp> #include <boost/algorithm/string/trim.hpp>
@ -281,8 +282,8 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
if (!table_structure) if (!table_structure)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure");
auto table_override = ASTTableOverride::tryGetTableOverride(current_database_name, table_name); auto table_override = tryGetTableOverride(current_database_name, table_name);
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override->as<ASTTableOverride>()); return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
} }
@ -300,8 +301,8 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
query_str = fmt::format("SELECT * FROM {}", quoted_name); query_str = fmt::format("SELECT * FROM {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name); LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
auto table_override = ASTTableOverride::tryGetTableOverride(current_database_name, table_name); auto table_override = tryGetTableOverride(current_database_name, table_name);
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name), table_override->as<ASTTableOverride>()); materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name), table_override ? table_override->as<ASTTableOverride>() : nullptr);
auto nested_storage = materialized_storage->getNested(); auto nested_storage = materialized_storage->getNested();
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();

View File

@ -2,29 +2,37 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/parseAddress.h> #include <Common/parseAddress.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/PostgreSQL/Connection.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h> #include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Processors/Transforms/FilterTransform.h> #include <Processors/Transforms/FilterTransform.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/executeQuery.h> #include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterDropQuery.h> #include <Interpreters/InterpreterDropQuery.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h> #include <Storages/ReadFinalForExternalReplicaStorage.h>
#include <Storages/StoragePostgreSQL.h> #include <Storages/StoragePostgreSQL.h>
#include <Core/PostgreSQL/Connection.h>
#include <QueryPipeline/Pipe.h>
namespace DB namespace DB
@ -438,7 +446,6 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
if (table_override && table_override->columns) if (table_override && table_override->columns)
{ {
table_override->applyToCreateTableQuery(create_table_query.get());
if (table_override->columns) if (table_override->columns)
{ {
auto children = table_override->columns->children; auto children = table_override->columns->children;

View File

@ -582,9 +582,9 @@ def test_table_override(started_cluster):
expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` UUID,\\n `_sign` Int8() MATERIALIZED 1,\\n `_version` UInt64() MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nORDER BY tuple(key)" expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` UUID,\\n `_sign` Int8() MATERIALIZED 1,\\n `_version` UInt64() MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nORDER BY tuple(key)"
assert(result.strip() == expected) assert(result.strip() == expected)
time.sleep(5) time.sleep(5)
result = instance.query(f"select * from {materialized_database}.{table_name} order by key") query = f"select * from {materialized_database}.{table_name} order by key"
expected = instance.query(f"select * from {table_name} order by key") expected = instance.query(f"select * from {table_name} order by key")
assert(result == expected) assert_eq_with_retry(instance, query, expected)
drop_materialized_db() drop_materialized_db()
drop_postgres_table(cursor, table_name) drop_postgres_table(cursor, table_name)