mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #27198 from kssenii/postgres-db-schema
Support schema for postgres database engine
This commit is contained in:
commit
5edd9e0513
@ -15,7 +15,7 @@ Supports table structure modifications (`ALTER TABLE ... ADD|DROP COLUMN`). If `
|
||||
|
||||
``` sql
|
||||
CREATE DATABASE test_database
|
||||
ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `use_table_cache`]);
|
||||
ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `schema`, `use_table_cache`]);
|
||||
```
|
||||
|
||||
**Engine Parameters**
|
||||
@ -24,6 +24,7 @@ ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `use_table_cac
|
||||
- `database` — Remote database name.
|
||||
- `user` — PostgreSQL user.
|
||||
- `password` — User password.
|
||||
- `schema` — PostgreSQL schema.
|
||||
- `use_table_cache` — Defines if the database table structure is cached or not. Optional. Default value: `0`.
|
||||
|
||||
## Data Types Support {#data_types-support}
|
||||
|
@ -103,9 +103,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
const String & engine_name = engine_define->engine->name;
|
||||
const UUID & uuid = create.uuid;
|
||||
|
||||
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL" ||
|
||||
engine_name == "Lazy" || engine_name == "Replicated" || engine_name == "PostgreSQL" ||
|
||||
engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
|
||||
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
|
||||
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
|
||||
|
||||
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
|
||||
|
||||
if (engine_define->engine->arguments && !engine_may_have_arguments)
|
||||
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
@ -113,6 +115,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
engine_define->primary_key || engine_define->order_by ||
|
||||
engine_define->sample_by;
|
||||
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
|
||||
|
||||
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
|
||||
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
|
||||
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
|
||||
@ -233,11 +236,10 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
{
|
||||
const ASTFunction * engine = engine_define->engine;
|
||||
|
||||
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 5)
|
||||
throw Exception(fmt::format(
|
||||
"{} Database require host:port, database_name, username, password arguments "
|
||||
"[, use_table_cache = 0].", engine_name),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 6)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"{} Database require `host:port`, `database_name`, `username`, `password` [, `schema` = "", `use_table_cache` = 0].",
|
||||
engine_name);
|
||||
|
||||
ASTs & engine_args = engine->arguments->children;
|
||||
|
||||
@ -249,9 +251,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
|
||||
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
|
||||
|
||||
String schema;
|
||||
if (engine->arguments->children.size() >= 5)
|
||||
schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
|
||||
|
||||
auto use_table_cache = 0;
|
||||
if (engine->arguments->children.size() == 5)
|
||||
use_table_cache = safeGetLiteralValue<UInt64>(engine_args[4], engine_name);
|
||||
if (engine->arguments->children.size() >= 6)
|
||||
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
|
||||
|
||||
/// Split into replicas if needed.
|
||||
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
|
||||
@ -266,7 +272,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
|
||||
|
||||
return std::make_shared<DatabasePostgreSQL>(
|
||||
context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache);
|
||||
context, metadata_path, engine_define, database_name, postgres_database_name, schema, connection_pool, use_table_cache);
|
||||
}
|
||||
else if (engine_name == "MaterializedPostgreSQL")
|
||||
{
|
||||
@ -274,9 +280,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
|
||||
if (!engine->arguments || engine->arguments->children.size() != 4)
|
||||
{
|
||||
throw Exception(
|
||||
fmt::format("{} Database require host:port, database_name, username, password arguments ", engine_name),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"{} Database require `host:port`, `database_name`, `username`, `password`.",
|
||||
engine_name);
|
||||
}
|
||||
|
||||
ASTs & engine_args = engine->arguments->children;
|
||||
|
@ -39,14 +39,16 @@ DatabasePostgreSQL::DatabasePostgreSQL(
|
||||
const String & metadata_path_,
|
||||
const ASTStorage * database_engine_define_,
|
||||
const String & dbname_,
|
||||
const String & postgres_dbname,
|
||||
const String & postgres_dbname_,
|
||||
const String & postgres_schema_,
|
||||
postgres::PoolWithFailoverPtr pool_,
|
||||
bool cache_tables_)
|
||||
: IDatabase(dbname_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, metadata_path(metadata_path_)
|
||||
, database_engine_define(database_engine_define_->clone())
|
||||
, dbname(postgres_dbname)
|
||||
, postgres_dbname(postgres_dbname_)
|
||||
, postgres_schema(postgres_schema_)
|
||||
, pool(std::move(pool_))
|
||||
, cache_tables(cache_tables_)
|
||||
{
|
||||
@ -55,12 +57,28 @@ DatabasePostgreSQL::DatabasePostgreSQL(
|
||||
}
|
||||
|
||||
|
||||
String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const
|
||||
{
|
||||
if (postgres_schema.empty())
|
||||
return fmt::format("{}.{}", postgres_dbname, table_name);
|
||||
return fmt::format("{}.{}.{}", postgres_dbname, postgres_schema, table_name);
|
||||
}
|
||||
|
||||
|
||||
String DatabasePostgreSQL::formatTableName(const String & table_name) const
|
||||
{
|
||||
if (postgres_schema.empty())
|
||||
return doubleQuoteString(table_name);
|
||||
return fmt::format("{}.{}", doubleQuoteString(postgres_schema), doubleQuoteString(table_name));
|
||||
}
|
||||
|
||||
|
||||
bool DatabasePostgreSQL::empty() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
auto connection_holder = pool->get();
|
||||
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get());
|
||||
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
|
||||
|
||||
for (const auto & table_name : tables_list)
|
||||
if (!detached_or_dropped.count(table_name))
|
||||
@ -76,7 +94,7 @@ DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local
|
||||
|
||||
Tables tables;
|
||||
auto connection_holder = pool->get();
|
||||
auto table_names = fetchPostgreSQLTablesList(connection_holder->get());
|
||||
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
|
||||
|
||||
for (const auto & table_name : table_names)
|
||||
if (!detached_or_dropped.count(table_name))
|
||||
@ -104,8 +122,11 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
|
||||
pqxx::result result = tx.exec(fmt::format(
|
||||
"SELECT '{}'::regclass, tablename "
|
||||
"FROM pg_catalog.pg_tables "
|
||||
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' "
|
||||
"AND tablename = '{}'", table_name, table_name));
|
||||
"WHERE schemaname != 'pg_catalog' AND {} "
|
||||
"AND tablename = '{}'",
|
||||
formatTableName(table_name),
|
||||
(postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema)),
|
||||
formatTableName(table_name)));
|
||||
}
|
||||
catch (pqxx::undefined_table const &)
|
||||
{
|
||||
@ -151,14 +172,14 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
|
||||
return StoragePtr{};
|
||||
|
||||
auto connection_holder = pool->get();
|
||||
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), doubleQuoteString(table_name)).columns;
|
||||
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), formatTableName(table_name)).columns;
|
||||
|
||||
if (!columns)
|
||||
return StoragePtr{};
|
||||
|
||||
auto storage = StoragePostgreSQL::create(
|
||||
StorageID(database_name, table_name), pool, table_name,
|
||||
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, local_context);
|
||||
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, local_context, postgres_schema);
|
||||
|
||||
if (cache_tables)
|
||||
cached_tables[table_name] = storage;
|
||||
@ -182,10 +203,14 @@ void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr
|
||||
std::lock_guard<std::mutex> lock{mutex};
|
||||
|
||||
if (!checkPostgresTable(table_name))
|
||||
throw Exception(fmt::format("Cannot attach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE,
|
||||
"Cannot attach PostgreSQL table {} because it does not exist in PostgreSQL",
|
||||
getTableNameForLogs(table_name), database_name);
|
||||
|
||||
if (!detached_or_dropped.count(table_name))
|
||||
throw Exception(fmt::format("Cannot attach table {}.{}. It already exists", database_name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS,
|
||||
"Cannot attach PostgreSQL table {} because it already exists",
|
||||
getTableNameForLogs(table_name), database_name);
|
||||
|
||||
if (cache_tables)
|
||||
cached_tables[table_name] = storage;
|
||||
@ -203,10 +228,10 @@ StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
|
||||
std::lock_guard<std::mutex> lock{mutex};
|
||||
|
||||
if (detached_or_dropped.count(table_name))
|
||||
throw Exception(fmt::format("Cannot detach table {}.{}. It is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Cannot detach table {}. It is already dropped/detached", getTableNameForLogs(table_name));
|
||||
|
||||
if (!checkPostgresTable(table_name))
|
||||
throw Exception(fmt::format("Cannot detach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot detach table {}, because it does not exist", getTableNameForLogs(table_name));
|
||||
|
||||
if (cache_tables)
|
||||
cached_tables.erase(table_name);
|
||||
@ -234,10 +259,10 @@ void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /
|
||||
std::lock_guard<std::mutex> lock{mutex};
|
||||
|
||||
if (!checkPostgresTable(table_name))
|
||||
throw Exception(fmt::format("Cannot drop table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot drop table {} because it does not exist", getTableNameForLogs(table_name));
|
||||
|
||||
if (detached_or_dropped.count(table_name))
|
||||
throw Exception(fmt::format("Table {}.{} is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
|
||||
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is already dropped/detached", getTableNameForLogs(table_name));
|
||||
|
||||
fs::path mark_table_removed = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
|
||||
FS::createFile(mark_table_removed);
|
||||
@ -281,7 +306,7 @@ void DatabasePostgreSQL::removeOutdatedTables()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{mutex};
|
||||
auto connection_holder = pool->get();
|
||||
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get());
|
||||
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema);
|
||||
|
||||
if (cache_tables)
|
||||
{
|
||||
@ -334,7 +359,7 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
|
||||
if (!storage)
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", getTableNameForLogs(table_name));
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
@ -367,9 +392,9 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
|
||||
ASTs storage_children = ast_storage->children;
|
||||
auto storage_engine_arguments = ast_storage->engine->arguments;
|
||||
|
||||
/// Remove extra engine argument (`use_table_cache`)
|
||||
if (storage_engine_arguments->children.size() > 4)
|
||||
storage_engine_arguments->children.resize(storage_engine_arguments->children.size() - 1);
|
||||
/// Remove extra engine argument (`schema` and `use_table_cache`)
|
||||
if (storage_engine_arguments->children.size() >= 5)
|
||||
storage_engine_arguments->children.resize(4);
|
||||
|
||||
/// Add table_name to engine arguments
|
||||
assert(storage_engine_arguments->children.size() >= 2);
|
||||
|
@ -32,7 +32,8 @@ public:
|
||||
const String & metadata_path_,
|
||||
const ASTStorage * database_engine_define,
|
||||
const String & dbname_,
|
||||
const String & postgres_dbname,
|
||||
const String & postgres_dbname_,
|
||||
const String & postgres_schema_,
|
||||
postgres::PoolWithFailoverPtr pool_,
|
||||
bool cache_tables_);
|
||||
|
||||
@ -69,7 +70,8 @@ protected:
|
||||
private:
|
||||
String metadata_path;
|
||||
ASTPtr database_engine_define;
|
||||
String dbname;
|
||||
String postgres_dbname;
|
||||
String postgres_schema;
|
||||
postgres::PoolWithFailoverPtr pool;
|
||||
const bool cache_tables;
|
||||
|
||||
@ -77,6 +79,10 @@ private:
|
||||
std::unordered_set<std::string> detached_or_dropped;
|
||||
BackgroundSchedulePool::TaskHolder cleaner_task;
|
||||
|
||||
String getTableNameForLogs(const String & table_name) const;
|
||||
|
||||
String formatTableName(const String & table_name) const;
|
||||
|
||||
bool checkPostgresTable(const String & table_name) const;
|
||||
|
||||
StoragePtr fetchTable(const String & table_name, ContextPtr context, const bool table_checked) const;
|
||||
|
@ -27,11 +27,12 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
template<typename T>
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx)
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema)
|
||||
{
|
||||
std::unordered_set<std::string> tables;
|
||||
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
|
||||
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
|
||||
std::string query = fmt::format("SELECT tablename FROM pg_catalog.pg_tables "
|
||||
"WHERE schemaname != 'pg_catalog' AND {}",
|
||||
postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema));
|
||||
|
||||
for (auto table_name : tx.template stream<std::string>(query))
|
||||
tables.insert(std::get<0>(table_name));
|
||||
@ -270,10 +271,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connec
|
||||
}
|
||||
|
||||
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection)
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema)
|
||||
{
|
||||
pqxx::ReadTransaction tx(connection);
|
||||
auto result = fetchPostgreSQLTablesList(tx);
|
||||
auto result = fetchPostgreSQLTablesList(tx, postgres_schema);
|
||||
tx.commit();
|
||||
return result;
|
||||
}
|
||||
@ -290,10 +291,10 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
bool with_primary_key, bool with_replica_identity_index);
|
||||
|
||||
template
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx);
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
|
||||
|
||||
template
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx);
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema);
|
||||
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ struct PostgreSQLTableStructure
|
||||
|
||||
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
|
||||
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection);
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema);
|
||||
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls = true);
|
||||
@ -32,7 +32,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
bool with_primary_key = false, bool with_replica_identity_index = false);
|
||||
|
||||
template<typename T>
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx);
|
||||
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx, const String & postgres_schema);
|
||||
|
||||
}
|
||||
|
||||
|
@ -479,7 +479,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
|
||||
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
|
||||
publication_name);
|
||||
|
||||
result_tables = fetchPostgreSQLTablesList(tx);
|
||||
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
|
||||
}
|
||||
/// Check tables list from publication is the same as expected tables list.
|
||||
/// If not - drop publication and return expected tables list.
|
||||
@ -521,7 +521,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
|
||||
/// Fetch all tables list from database. Publication does not exist yet, which means
|
||||
/// that no replication took place. Publication will be created in
|
||||
/// startSynchronization method.
|
||||
result_tables = fetchPostgreSQLTablesList(tx);
|
||||
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,6 +124,8 @@ private:
|
||||
MaterializedStorages materialized_storages;
|
||||
|
||||
UInt64 milliseconds_to_wait;
|
||||
|
||||
String postgres_schema;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ def test_postgresql_database_engine_table_cache(started_cluster):
|
||||
cursor = conn.cursor()
|
||||
|
||||
node1.query(
|
||||
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1)")
|
||||
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', '', 1)")
|
||||
|
||||
create_postgres_table(cursor, 'test_table')
|
||||
assert node1.query('DESCRIBE TABLE test_database.test_table').rstrip() == 'id\tInt32\t\t\t\t\t\nvalue\tNullable(Int32)'
|
||||
@ -183,6 +183,31 @@ def test_postgresql_database_engine_table_cache(started_cluster):
|
||||
assert 'test_database' not in node1.query('SHOW DATABASES')
|
||||
|
||||
|
||||
def test_postgresql_database_with_schema(started_cluster):
|
||||
conn = get_postgres_conn(started_cluster, True)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('DROP SCHEMA IF EXISTS test_schema CASCADE')
|
||||
cursor.execute('DROP SCHEMA IF EXISTS "test.nice.schema" CASCADE')
|
||||
|
||||
cursor.execute('CREATE SCHEMA test_schema')
|
||||
cursor.execute('CREATE TABLE test_schema.table1 (a integer)')
|
||||
cursor.execute('CREATE TABLE test_schema.table2 (a integer)')
|
||||
cursor.execute('CREATE TABLE table3 (a integer)')
|
||||
|
||||
node1.query(
|
||||
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 'test_schema')")
|
||||
|
||||
assert(node1.query('SHOW TABLES FROM test_database') == 'table1\ntable2\n')
|
||||
|
||||
node1.query("INSERT INTO test_database.table1 SELECT number from numbers(10000)")
|
||||
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == '10000'
|
||||
node1.query("DETACH TABLE test_database.table1")
|
||||
node1.query("ATTACH TABLE test_database.table1")
|
||||
assert node1.query("SELECT count() FROM test_database.table1").rstrip() == '10000'
|
||||
node1.query("DROP DATABASE test_database")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user