Merge pull request #32749 from kssenii/table-override-pg

Support Table Override clause for MaterializedPostgreSQL too
This commit is contained in:
Kseniia Sumarokova 2021-12-20 13:11:47 +03:00 committed by GitHub
commit 967738ff8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 191 additions and 74 deletions

View File

@ -85,7 +85,7 @@ void insertPostgreSQLValue(
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;
case ExternalResultDescription::ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
assert_cast<ColumnUUID &>(column).insertValue(parse<UUID>(value.data(), value.size()));
break;
case ExternalResultDescription::ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});

View File

@ -119,7 +119,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL"};
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)

View File

@ -20,6 +20,7 @@
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Common/quoteString.h>
#include <Common/assert_cast.h>
#include <Interpreters/getTableOverride.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -435,22 +436,6 @@ void InterpreterCreateImpl::validate(const InterpreterCreateImpl::TQuery & creat
}
}
static ASTPtr tryGetTableOverride(const String & mapped_database, const String & table)
{
if (auto database_ptr = DatabaseCatalog::instance().tryGetDatabase(mapped_database))
{
auto create_query = database_ptr->getCreateDatabaseQuery();
if (auto * create_database_query = create_query->as<ASTCreateQuery>())
{
if (create_database_query->table_overrides)
{
return create_database_query->table_overrides->tryGetTableOverride(table);
}
}
}
return nullptr;
}
ASTs InterpreterCreateImpl::getRewrittenQueries(
const TQuery & create_query, ContextPtr context, const String & mapped_to_database, const String & mysql_database)
{

View File

@ -0,0 +1,27 @@
#include "getTableOverride.h"
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTTableOverrides.h>
#include <Databases/IDatabase.h>
namespace DB
{
ASTPtr tryGetTableOverride(const String & mapped_database, const String & table)
{
if (auto database_ptr = DatabaseCatalog::instance().tryGetDatabase(mapped_database))
{
auto create_query = database_ptr->getCreateDatabaseQuery();
if (auto create_database_query = create_query->as<ASTCreateQuery>())
{
if (create_database_query->table_overrides)
{
return create_database_query->table_overrides->tryGetTableOverride(table);
}
}
}
return nullptr;
}
}

View File

@ -0,0 +1,8 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Core/Types.h>
namespace DB
{
ASTPtr tryGetTableOverride(const String & mapped_database, const String & table);
}

View File

@ -1,13 +1,15 @@
#include "PostgreSQLReplicationHandler.h"
#include <Common/setThreadName.h>
#include <Parsers/ASTTableOverrides.h>
#include <Processors/Transforms/PostgreSQLSource.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Interpreters/getTableOverride.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <Databases/DatabaseOnDisk.h>
#include <boost/algorithm/string/trim.hpp>
@ -279,7 +281,9 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true));
if (!table_structure)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure");
return storage->getCreateNestedTableQuery(std::move(table_structure));
auto table_override = tryGetTableOverride(current_database_name, table_name);
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
}
@ -297,7 +301,8 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
query_str = fmt::format("SELECT * FROM {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name));
auto table_override = tryGetTableOverride(current_database_name, table_name);
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name), table_override ? table_override->as<ASTTableOverride>() : nullptr);
auto nested_storage = materialized_storage->getNested();
auto insert = std::make_shared<ASTInsertQuery>();

View File

@ -2,28 +2,37 @@
#if USE_LIBPQXX
#include <base/logger_useful.h>
#include <Common/Macros.h>
#include <Common/parseAddress.h>
#include <Common/assert_cast.h>
#include <Core/Settings.h>
#include <Core/PostgreSQL/Connection.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Storages/StorageFactory.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h>
#include <Storages/StoragePostgreSQL.h>
#include <Core/PostgreSQL/Connection.h>
#include <QueryPipeline/Pipe.h>
namespace DB
@ -181,18 +190,18 @@ StorageID StorageMaterializedPostgreSQL::getNestedStorageID() const
}
void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override)
{
if (tryGetNested())
return;
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
try
{
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure), table_override);
auto table_id = getStorageID();
auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName());
LOG_DEBUG(log, "Creating clickhouse table for postgresql table {}", table_id.getNameForLogs());
try
{
InterpreterCreateQuery interpreter(ast_create, nested_context);
interpreter.execute();
@ -200,10 +209,10 @@ void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructur
/// Save storage_id with correct uuid.
nested_table_id = nested_storage->getStorageID();
}
catch (Exception & e)
catch (...)
{
e.addMessage("while creating nested table: {}", tmp_nested_table_id.getNameForLogs());
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
@ -362,12 +371,31 @@ ASTPtr StorageMaterializedPostgreSQL::getColumnDeclaration(const DataTypePtr & d
}
std::shared_ptr<ASTExpressionList> StorageMaterializedPostgreSQL::getColumnsExpressionList(const NamesAndTypesList & columns) const
{
auto columns_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & [name, type] : columns)
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = name;
column_declaration->type = getColumnDeclaration(type);
columns_expression_list->children.emplace_back(column_declaration);
}
return columns_expression_list;
}
/// For single storage MaterializedPostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializedPostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// transaction with snapshot, which is used for initial tables dump.
ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure)
ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override)
{
auto create_table_query = std::make_shared<ASTCreateQuery>();
if (table_override)
table_override->applyToCreateTableQuery(create_table_query.get());
auto table_id = getStorageID();
create_table_query->setTable(getNestedTableName());
@ -375,40 +403,85 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(PostgreSQLTableS
if (is_materialized_postgresql_database)
create_table_query->uuid = table_id.uuid;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, makeASTFunction("ReplacingMergeTree", std::make_shared<ASTIdentifier>("_version")));
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
auto order_by_expression = std::make_shared<ASTFunction>();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & columns = metadata_snapshot->getColumns();
ConstraintsDescription constraints;
NamesAndTypesList ordinary_columns_and_types;
if (!is_materialized_postgresql_database)
if (is_materialized_postgresql_database)
{
ordinary_columns_and_types = columns.getOrdinary();
}
else
if (!table_structure && !table_override)
{
if (!table_structure)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No table structure returned for table {}.{}", table_id.database_name, table_id.table_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table structure returned for table {}.{}",
table_id.database_name, table_id.table_name);
}
if (!table_structure->columns)
if (!table_structure->columns && (!table_override || !table_override->columns))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No columns returned for table {}.{}", table_id.database_name, table_id.table_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns returned for table {}.{}",
table_id.database_name, table_id.table_name);
}
ordinary_columns_and_types = *table_structure->columns;
bool has_order_by_override = table_override && table_override->storage && table_override->storage->order_by;
if (has_order_by_override && !table_structure->replica_identity_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Having PRIMARY KEY OVERRIDE is allowed only if there is "
"replica identity index for PostgreSQL table. (table {}.{})",
table_id.database_name, table_id.table_name);
}
if (!table_structure->primary_key_columns && !table_structure->replica_identity_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table {}.{} has no primary key and no replica identity index", table_id.database_name, table_id.table_name);
"Table {}.{} has no primary key and no replica identity index",
table_id.database_name, table_id.table_name);
}
if (table_override && table_override->columns)
{
if (table_override->columns)
{
auto children = table_override->columns->children;
const auto & columns = children[0]->as<ASTExpressionList>();
if (columns)
{
for (const auto & child : columns->children)
{
const auto * column_declaration = child->as<ASTColumnDeclaration>();
auto type = DataTypeFactory::instance().get(column_declaration->type);
ordinary_columns_and_types.emplace_back(NameAndTypePair(column_declaration->name, type));
}
}
columns_declare_list->set(columns_declare_list->columns, children[0]);
}
else
{
ordinary_columns_and_types = *table_structure->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
}
auto columns = table_override->columns;
if (columns && columns->constraints)
constraints = ConstraintsDescription(columns->constraints->children);
}
else
{
ordinary_columns_and_types = *table_structure->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
}
if (ordinary_columns_and_types.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Table {}.{} has no columns", table_id.database_name, table_id.table_name);
NamesAndTypesList merging_columns;
if (table_structure->primary_key_columns)
merging_columns = *table_structure->primary_key_columns;
@ -417,39 +490,28 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(PostgreSQLTableS
order_by_expression->name = "tuple";
order_by_expression->arguments = std::make_shared<ASTExpressionList>();
for (const auto & column : merging_columns)
order_by_expression->arguments->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
}
for (const auto & [name, type] : ordinary_columns_and_types)
storage->set(storage->order_by, order_by_expression);
}
else
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
ordinary_columns_and_types = metadata_snapshot->getColumns().getOrdinary();
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
column_declaration->name = name;
column_declaration->type = getColumnDeclaration(type);
auto primary_key_ast = metadata_snapshot->getPrimaryKeyAST();
if (!primary_key_ast)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage MaterializedPostgreSQL must have primary key");
storage->set(storage->order_by, primary_key_ast);
columns_expression_list->children.emplace_back(column_declaration);
constraints = metadata_snapshot->getConstraints();
}
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_sign", "Int8", 1));
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_version", "UInt64", 1));
create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// Not nullptr for single storage (because throws exception if not specified), nullptr otherwise.
auto primary_key_ast = getInMemoryMetadataPtr()->getPrimaryKeyAST();
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, makeASTFunction("ReplacingMergeTree", std::make_shared<ASTIdentifier>("_version")));
if (primary_key_ast)
storage->set(storage->order_by, primary_key_ast);
else
storage->set(storage->order_by, order_by_expression);
create_table_query->set(create_table_query->storage, storage);
/// Add columns _sign and _version, so that they can be accessed from nested ReplacingMergeTree table if needed.
@ -458,8 +520,7 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(PostgreSQLTableS
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription(ordinary_columns_and_types));
storage_metadata.setConstraints(metadata_snapshot->getConstraints());
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
return create_table_query;

View File

@ -99,7 +99,11 @@ public:
/// only once - when nested table is successfully created and is never changed afterwards.
bool hasNested() { return has_nested.load(); }
void createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure);
void createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override);
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override);
std::shared_ptr<ASTExpressionList> getColumnsExpressionList(const NamesAndTypesList & columns) const;
StoragePtr getNested() const;
@ -120,8 +124,6 @@ public:
bool supportsFinal() const override { return true; }
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);
protected:
StorageMaterializedPostgreSQL(
const StorageID & table_id_,

View File

@ -34,6 +34,10 @@ postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value UUID, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
@ -93,7 +97,7 @@ def drop_clickhouse_postgres_db(name='postgres_database'):
def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[]):
settings=[], table_overrides=''):
instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0:
@ -102,6 +106,7 @@ def create_materialized_db(ip, port,
if i != 0:
create_query += ', '
create_query += settings[i]
create_query += table_overrides
instance.query(create_query)
assert materialized_database in instance.query('SHOW DATABASES')
@ -560,6 +565,30 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
drop_materialized_db()
def test_table_override(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
table_name = 'table_override'
materialized_database = 'test_database'
create_postgres_table(cursor, table_name, template=postgres_table_template_5);
instance.query(f"create table {table_name}(key Int32, value UUID) engine = PostgreSQL (postgres1, table={table_name})")
instance.query(f"insert into {table_name} select number, generateUUIDv4() from numbers(10)")
table_overrides = f" TABLE OVERRIDE {table_name} (COLUMNS (key Int32, value UUID))"
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{table_name}'"], table_overrides=table_overrides)
assert_nested_table_is_created(table_name, materialized_database)
result = instance.query(f"show create table {materialized_database}.{table_name}")
print(result)
expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` UUID,\\n `_sign` Int8() MATERIALIZED 1,\\n `_version` UInt64() MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nORDER BY tuple(key)"
assert(result.strip() == expected)
time.sleep(5)
query = f"select * from {materialized_database}.{table_name} order by key"
expected = instance.query(f"select * from {table_name} order by key")
assert_eq_with_retry(instance, query, expected)
drop_materialized_db()
drop_postgres_table(cursor, table_name)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")