Support schema for postgres database engine

This commit is contained in:
kssenii 2021-08-04 20:42:29 +00:00
parent d63a5e1c96
commit f06703a7c9
9 changed files with 112 additions and 48 deletions

View File

@ -15,7 +15,7 @@ Supports table structure modifications (`ALTER TABLE ... ADD|DROP COLUMN`). If `
``` sql
CREATE DATABASE test_database
ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `use_table_cache`]);
ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `schema`, `use_table_cache`]);
```
**Engine Parameters**
@ -24,6 +24,7 @@ ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `use_table_cac
- `database` — Remote database name.
- `user` — PostgreSQL user.
- `password` — User password.
- `schema` — PostgreSQL schema.
- `use_table_cache` — Defines if the database table structure is cached or not. Optional. Default value: `0`.
## Data Types Support {#data_types-support}

View File

@ -103,9 +103,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL" ||
engine_name == "Lazy" || engine_name == "Replicated" || engine_name == "PostgreSQL" ||
engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
static const std::unordered_set<String> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
@ -113,6 +115,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -234,10 +237,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 5)
throw Exception(fmt::format(
"{} Database require host:port, database_name, username, password arguments "
"[, use_table_cache = 0].", engine_name),
ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password` [, `schema` = "", `use_table_cache` = 0].",
engine_name);
ASTs & engine_args = engine->arguments->children;
@ -249,9 +251,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
auto use_table_cache = 0;
if (engine->arguments->children.size() == 5)
use_table_cache = safeGetLiteralValue<UInt64>(engine_args[4], engine_name);
String schema;
if (engine->arguments->children.size() >= 5)
schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
bool use_table_cache = 0;
if (engine->arguments->children.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
/// Split into replicas if needed.
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
@ -266,7 +272,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache);
context, metadata_path, engine_define, database_name, postgres_database_name, schema, connection_pool, use_table_cache);
}
else if (engine_name == "MaterializedPostgreSQL")
{
@ -274,9 +280,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (!engine->arguments || engine->arguments->children.size() != 4)
{
throw Exception(
fmt::format("{} Database require host:port, database_name, username, password arguments ", engine_name),
ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password`.",
engine_name);
}
ASTs & engine_args = engine->arguments->children;

View File

@ -39,14 +39,16 @@ DatabasePostgreSQL::DatabasePostgreSQL(
const String & metadata_path_,
const ASTStorage * database_engine_define_,
const String & dbname_,
const String & postgres_dbname,
const String & postgres_dbname_,
const String & postgres_schema_,
postgres::PoolWithFailoverPtr pool_,
bool cache_tables_)
: IDatabase(dbname_)
, WithContext(context_->getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, dbname(postgres_dbname)
, postgres_dbname(postgres_dbname_)
, postgres_schema(postgres_schema_)
, pool(std::move(pool_))
, cache_tables(cache_tables_)
{
@ -55,12 +57,28 @@ DatabasePostgreSQL::DatabasePostgreSQL(
}
String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const
{
if (postgres_schema.empty())
return fmt::format("{}.{}", postgres_dbname, table_name);
return fmt::format("{}.{}.{}", postgres_dbname, postgres_schema, table_name);
}
String DatabasePostgreSQL::formatTableName(const String & table_name) const
{
if (postgres_schema.empty())
return doubleQuoteString(table_name);
return fmt::format("{}.{}", doubleQuoteString(postgres_schema), doubleQuoteString(table_name));
}
bool DatabasePostgreSQL::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
auto connection_holder = pool->get();
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get());
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
for (const auto & table_name : tables_list)
if (!detached_or_dropped.count(table_name))
@ -76,7 +94,7 @@ DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local
Tables tables;
auto connection_holder = pool->get();
auto table_names = fetchPostgreSQLTablesList(connection_holder->get());
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
for (const auto & table_name : table_names)
if (!detached_or_dropped.count(table_name))
@ -104,8 +122,11 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
pqxx::result result = tx.exec(fmt::format(
"SELECT '{}'::regclass, tablename "
"FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' "
"AND tablename = '{}'", table_name, table_name));
"WHERE schemaname != 'pg_catalog' AND {} "
"AND tablename = '{}'",
formatTableName(table_name),
(postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema)),
formatTableName(table_name)));
}
catch (pqxx::undefined_table const &)
{
@ -151,14 +172,14 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
return StoragePtr{};
auto connection_holder = pool->get();
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), doubleQuoteString(table_name)).columns;
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), formatTableName(table_name)).columns;
if (!columns)
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, local_context);
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, local_context, postgres_schema);
if (cache_tables)
cached_tables[table_name] = storage;
@ -182,10 +203,12 @@ void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot attach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
throw Exception(fmt::format("Cannot attach PostgreSQL table {} because it does not exist in PostgreSQL",
getTableNameForLogs(table_name), database_name), ErrorCodes::UNKNOWN_TABLE);
if (!detached_or_dropped.count(table_name))
throw Exception(fmt::format("Cannot attach table {}.{}. It already exists", database_name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception(fmt::format("Cannot attach PostgreSQL table {} because it already exists",
getTableNameForLogs(table_name), database_name), ErrorCodes::TABLE_ALREADY_EXISTS);
if (cache_tables)
cached_tables[table_name] = storage;
@ -203,10 +226,10 @@ StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
std::lock_guard<std::mutex> lock{mutex};
if (detached_or_dropped.count(table_name))
throw Exception(fmt::format("Cannot detach table {}.{}. It is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
throw Exception(fmt::format("Cannot detach table {}. It is already dropped/detached", getTableNameForLogs(table_name)), ErrorCodes::TABLE_IS_DROPPED);
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot detach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
throw Exception(fmt::format("Cannot detach table {}, because it does not exist", getTableNameForLogs(table_name)), ErrorCodes::UNKNOWN_TABLE);
if (cache_tables)
cached_tables.erase(table_name);
@ -234,10 +257,10 @@ void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot drop table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
throw Exception(fmt::format("Cannot drop table {} because it does not exist", getTableNameForLogs(table_name)), ErrorCodes::UNKNOWN_TABLE);
if (detached_or_dropped.count(table_name))
throw Exception(fmt::format("Table {}.{} is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
throw Exception(fmt::format("Table {} is already dropped/detached", getTableNameForLogs(table_name)), ErrorCodes::TABLE_IS_DROPPED);
fs::path mark_table_removed = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
FS::createFile(mark_table_removed);
@ -281,7 +304,7 @@ void DatabasePostgreSQL::removeOutdatedTables()
{
std::lock_guard<std::mutex> lock{mutex};
auto connection_holder = pool->get();
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get());
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
if (cache_tables)
{
@ -334,7 +357,7 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
if (!storage)
{
if (throw_on_error)
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
throw Exception(fmt::format("PostgreSQL table {} does not exist", getTableNameForLogs(table_name)), ErrorCodes::UNKNOWN_TABLE);
return nullptr;
}
@ -367,9 +390,9 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Remove extra engine argument (`use_table_cache`)
if (storage_engine_arguments->children.size() > 4)
storage_engine_arguments->children.resize(storage_engine_arguments->children.size() - 1);
/// Remove extra engine argument (`schema` and `use_table_cache`)
if (storage_engine_arguments->children.size() >= 5)
storage_engine_arguments->children.resize(4);
/// Add table_name to engine arguments
assert(storage_engine_arguments->children.size() >= 2);

View File

@ -32,7 +32,8 @@ public:
const String & metadata_path_,
const ASTStorage * database_engine_define,
const String & dbname_,
const String & postgres_dbname,
const String & postgres_dbname_,
const String & postgres_schema_,
postgres::PoolWithFailoverPtr pool_,
bool cache_tables_);
@ -69,7 +70,8 @@ protected:
private:
String metadata_path;
ASTPtr database_engine_define;
String dbname;
String postgres_dbname;
String postgres_schema;
postgres::PoolWithFailoverPtr pool;
const bool cache_tables;
@ -77,6 +79,10 @@ private:
std::unordered_set<std::string> detached_or_dropped;
BackgroundSchedulePool::TaskHolder cleaner_task;
String getTableNameForLogs(const String & table_name) const;
String formatTableName(const String & table_name) const;
bool checkPostgresTable(const String & table_name) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, const bool table_checked) const;

View File

@ -27,11 +27,12 @@ namespace ErrorCodes
template<typename T>
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx)
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema)
{
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
std::string query = fmt::format("SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND {}",
postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema));
for (auto table_name : tx.template stream<std::string>(query))
tables.insert(std::get<0>(table_name));
@ -270,10 +271,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connec
}
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection)
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema)
{
pqxx::ReadTransaction tx(connection);
auto result = fetchPostgreSQLTablesList(tx);
auto result = fetchPostgreSQLTablesList(tx, postgres_schema);
tx.commit();
return result;
}
@ -290,10 +291,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
bool with_primary_key, bool with_replica_identity_index);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx);
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx);
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema);
}

View File

@ -21,7 +21,7 @@ struct PostgreSQLTableStructure
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection);
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema);
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls = true);
@ -32,7 +32,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
bool with_primary_key = false, bool with_replica_identity_index = false);
template<typename T>
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx);
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema);
}

View File

@ -479,7 +479,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
publication_name);
result_tables = fetchPostgreSQLTablesList(tx);
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
/// Check tables list from publication is the same as expected tables list.
/// If not - drop publication and return expected tables list.
@ -521,7 +521,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
/// Fetch all tables list from database. Publication does not exist yet, which means
/// that no replication took place. Publication will be created in
/// startSynchronization method.
result_tables = fetchPostgreSQLTablesList(tx);
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
}

View File

@ -124,6 +124,8 @@ private:
MaterializedStorages materialized_storages;
UInt64 milliseconds_to_wait;
String postgres_schema;
};
}

View File

@ -151,7 +151,7 @@ def test_postgresql_database_engine_table_cache(started_cluster):
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1)")
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', "", 1)")
create_postgres_table(cursor, 'test_table')
assert node1.query('DESCRIBE TABLE test_database.test_table').rstrip() == 'id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)'
@ -183,6 +183,31 @@ def test_postgresql_database_engine_table_cache(started_cluster):
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_with_schema(started_cluster):
conn = get_postgres_conn(started_cluster, True)
cursor = conn.cursor()
cursor.execute('DROP SCHEMA IF EXISTS test_schema CASCADE')
cursor.execute('DROP SCHEMA IF EXISTS "test.nice.schema" CASCADE')
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.table1 (a integer)')
cursor.execute('CREATE TABLE test_schema.table2 (a integer)')
cursor.execute('CREATE TABLE table3 (a integer)')
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 'test_schema')")
assert(node1.query('SHOW TABLES FROM test_database') == 'table1\ntable2\n')
node1.query("INSERT INTO test_database.table1 SELECT number from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == '10000'
node1.query("DETACH TABLE test_database.table1")
node1.query("ATTACH TABLE test_database.table1")
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == '10000'
node1.query("DROP DATABASE test_database")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")