From 1dce048b58cc35f8d31b7810d10b3f3c7a1927f8 Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Thu, 28 Dec 2023 18:59:45 -0800 Subject: [PATCH 1/6] initial refactor of db registration --- src/Databases/DatabaseAtomic.cpp | 13 ++++++++ src/Databases/DatabaseDictionary.cpp | 11 +++++++ src/Databases/DatabaseFactory.cpp | 12 ++++++++ src/Databases/DatabaseFactory.h | 33 +++++++++++++++++++-- src/Databases/DatabaseFilesystem.cpp | 12 ++++++++ src/Databases/DatabaseLazy.cpp | 13 ++++++++ src/Databases/DatabaseMemory.cpp | 12 ++++++++ src/Databases/DatabaseOrdinary.cpp | 12 ++++++++ src/Databases/registerDatabases.cpp | 26 ++++++++++++++++ src/Databases/registerDatabases.h | 4 +++ src/Interpreters/InterpreterCreateQuery.cpp | 2 +- 11 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 src/Databases/registerDatabases.cpp create mode 100644 src/Databases/registerDatabases.h diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 1daa6351c23..8a5ba5f033f 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -622,4 +623,16 @@ void DatabaseAtomic::checkDetachedTableNotInUse(const UUID & uuid) assertDetachedTableNotInUse(uuid); } +void registerDatabaseAtomic(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.metadata_path, + args.uuid, + args.context); + }; + factory.registerDatabase("Atomic", create_fn); +} } diff --git a/src/Databases/DatabaseDictionary.cpp b/src/Databases/DatabaseDictionary.cpp index 3a3dea1d38e..e2e0d52cd88 100644 --- a/src/Databases/DatabaseDictionary.cpp +++ b/src/Databases/DatabaseDictionary.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -140,4 +141,14 @@ void DatabaseDictionary::shutdown() { } +void registerDatabaseDictionary(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.context); + }; + factory.registerDatabase("Dictionary", create_fn); +} } diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index a967ecf67c6..5f7e5c673f2 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -119,6 +119,18 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m return impl; } +void DatabaseFactory::registerDatabase(const std::string & name, CreatorFn creator_fn) +{ + if (!databases.emplace(name, Creator{std::move(creator_fn)}).second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database name '{}' is not unique", name); +} + +DatabaseFactory & DatabaseFactory::instance() +{ + static DatabaseFactory db_fact; + return db_fact; +} + template static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) { diff --git a/src/Databases/DatabaseFactory.h b/src/Databases/DatabaseFactory.h index cb631cd76d0..51fc150b086 100644 --- a/src/Databases/DatabaseFactory.h +++ b/src/Databases/DatabaseFactory.h @@ -8,12 +8,39 @@ namespace DB class ASTCreateQuery; -class DatabaseFactory +class DatabaseFactory : private boost::noncopyable { public: - static DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context); - static DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context); + static DatabaseFactory & instance(); + + struct Arguments + { + const String & database_name; + const String & metadata_path; + const UUID & uuid; + const ContextPtr & context; + const UInt64 & cache_expiration_time_seconds; + }; + + DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context); + + DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context); + + using CreatorFn = std::function; + + struct Creator + { + CreatorFn creator_fn; + }; + using Databases = std::unordered_map; + + void registerDatabase(const std::string & name, CreatorFn creator_fn); + + const Databases & getAllDatabases() const { return databases; } + +private: + Databases databases; }; } diff --git a/src/Databases/DatabaseFilesystem.cpp b/src/Databases/DatabaseFilesystem.cpp index ca1b5b27a59..800f5145e8b 100644 --- a/src/Databases/DatabaseFilesystem.cpp +++ b/src/Databases/DatabaseFilesystem.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -237,4 +238,15 @@ DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, cons return std::make_unique(Tables{}, getDatabaseName()); } +void registerDatabaseFilesystem(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.metadata_path, + args.context); + }; + factory.registerDatabase("FileSystem", create_fn); +} } diff --git a/src/Databases/DatabaseLazy.cpp b/src/Databases/DatabaseLazy.cpp index c6249c68933..fc500884266 100644 --- a/src/Databases/DatabaseLazy.cpp +++ b/src/Databases/DatabaseLazy.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -354,4 +355,16 @@ const StoragePtr & DatabaseLazyIterator::table() const return current_storage; } +void registerDatabaseLazy(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.metadata_path, + args.cache_expiration_time_seconds, + args.context); + }; + factory.registerDatabase("Lazy", create_fn); +} } diff --git a/src/Databases/DatabaseMemory.cpp b/src/Databases/DatabaseMemory.cpp index 2a7a2ad8ccc..794eebbc399 100644 --- a/src/Databases/DatabaseMemory.cpp +++ b/src/Databases/DatabaseMemory.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -209,4 +210,15 @@ std::vector> DatabaseMemory::getTablesForBackup(co return res; } +void registerDatabaseMemory(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.context); + }; + factory.registerDatabase("Memory", create_fn); +} + } diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 9a9dcf22c88..390ff7fd146 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -321,4 +322,15 @@ void DatabaseOrdinary::commitAlterTable(const StorageID &, const String & table_ } } +void registerDatabaseOrdinary(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + return make_shared( + args.database_name, + args.metadata_path, + args.context); + }; + factory.registerDatabase("Ordinary", create_fn); +} } diff --git a/src/Databases/registerDatabases.cpp b/src/Databases/registerDatabases.cpp new file mode 100644 index 00000000000..78f34aadc37 --- /dev/null +++ b/src/Databases/registerDatabases.cpp @@ -0,0 +1,26 @@ +#include +#include + + +namespace DB +{ + +void registerDatabaseAtomic(DatabaseFactory & factory); +void registerDatabaseOrdinary(DatabaseFactory & factory); +void registerDatabaseDictionary(DatabaseFactory & factory); +void registerDatabaseMemory(DatabaseFactory & factory); +void registerDatabaseLazy(DatabaseFactory & factory); +void registerDatabaseFilesystem(DatabaseFactory & factory); + + +void registerDatabases() +{ + auto & factory = DatabaseFactory::instance(); + registerDatabaseAtomic(factory); + registerDatabaseOrdinary(factory); + registerDatabaseDictionary(factory); + registerDatabaseMemory(factory); + registerDatabaseLazy(factory); + registerDatabaseFilesystem(factory); +} +} diff --git a/src/Databases/registerDatabases.h b/src/Databases/registerDatabases.h new file mode 100644 index 00000000000..4af1447e04f --- /dev/null +++ b/src/Databases/registerDatabases.h @@ -0,0 +1,4 @@ +namespace DB +{ +void registerDatabases(); +} diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 1eadb325e95..7a2bd9f6167 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -282,7 +282,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) else if (create.uuid != UUIDHelpers::Nil && !DatabaseCatalog::instance().hasUUIDMapping(create.uuid)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find UUID mapping for {}, it's a bug", create.uuid); - DatabasePtr database = DatabaseFactory::get(create, metadata_path / "", getContext()); + DatabasePtr database = DatabaseFactory::instance().get(create, metadata_path / "", getContext()); if (create.uuid != UUIDHelpers::Nil) create.setDatabase(TABLE_WITH_UUID_NAME_PLACEHOLDER); From 8e83d3604d215b4700fc915b75774ec0c1223c18 Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Fri, 29 Dec 2023 11:09:25 -0800 Subject: [PATCH 2/6] register all remaining databases --- src/Databases/DatabaseFactory.h | 16 ++- src/Databases/DatabaseHDFS.cpp | 34 +++++++ src/Databases/DatabaseLazy.cpp | 22 ++++- src/Databases/DatabaseReplicated.cpp | 47 +++++++++ src/Databases/DatabaseS3.cpp | 21 +++- .../MySQL/DatabaseMaterializedMySQL.cpp | 98 +++++++++++++++++++ src/Databases/MySQL/DatabaseMySQL.cpp | 86 ++++++++++++++++ .../DatabaseMaterializedPostgreSQL.cpp | 68 +++++++++++++ .../PostgreSQL/DatabasePostgreSQL.cpp | 90 +++++++++++++++++ src/Databases/SQLite/DatabaseSQLite.cpp | 28 ++++++ src/Databases/registerDatabases.cpp | 46 +++++++++ 11 files changed, 552 insertions(+), 4 deletions(-) diff --git a/src/Databases/DatabaseFactory.h b/src/Databases/DatabaseFactory.h index 51fc150b086..cbc506d691d 100644 --- a/src/Databases/DatabaseFactory.h +++ b/src/Databases/DatabaseFactory.h @@ -3,6 +3,15 @@ #include #include +#if USE_MYSQL +# include +# include +# include +# include +# include +# include +#endif + namespace DB { @@ -16,11 +25,14 @@ public: struct Arguments { + const String & engine_name; + ASTs & engine_args; + ASTStorage * storage_def; + const ASTCreateQuery & create_query; const String & database_name; const String & metadata_path; const UUID & uuid; - const ContextPtr & context; - const UInt64 & cache_expiration_time_seconds; + ContextPtr & context; }; DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context); diff --git a/src/Databases/DatabaseHDFS.cpp b/src/Databases/DatabaseHDFS.cpp index 750d79c8493..46c642822f6 100644 --- a/src/Databases/DatabaseHDFS.cpp +++ b/src/Databases/DatabaseHDFS.cpp @@ -2,6 +2,7 @@ #if USE_HDFS +#include #include #include @@ -237,6 +238,39 @@ DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const Filt return std::make_unique(Tables{}, getDatabaseName()); } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseHDFS(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + const String & engine_name = engine_define->engine->name; + + /// If source_url is empty, then table name must contain full url + std::string source_url; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url"); + + const auto & arguments = engine->arguments->children; + source_url = safeGetLiteralValue(arguments[0], engine_name); + } + + return std::make_shared(args.database_name, source_url, args.context); + }; + factory.registerDatabase("HDFS", create_fn); +} } // DB #endif diff --git a/src/Databases/DatabaseLazy.cpp b/src/Databases/DatabaseLazy.cpp index fc500884266..d6d97f916e0 100644 --- a/src/Databases/DatabaseLazy.cpp +++ b/src/Databases/DatabaseLazy.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -355,14 +356,33 @@ const StoragePtr & DatabaseLazyIterator::table() const return current_storage; } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + void registerDatabaseLazy(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument"); + + const auto & arguments = engine->arguments->children; + + const auto cache_expiration_time_seconds = safeGetLiteralValue(arguments[0], "Lazy"); + return make_shared( args.database_name, args.metadata_path, - args.cache_expiration_time_seconds, + cache_expiration_time_seconds, args.context); }; factory.registerDatabase("Lazy", create_fn); diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 952c0689a0d..03b72b18bc8 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -1652,4 +1653,50 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, return true; } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseReplicated(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 3) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name"); + + auto & arguments = engine->arguments->children; + for (auto & engine_arg : arguments) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); + + String zookeeper_path = safeGetLiteralValue(arguments[0], "Replicated"); + String shard_name = safeGetLiteralValue(arguments[1], "Replicated"); + String replica_name = safeGetLiteralValue(arguments[2], "Replicated"); + + zookeeper_path = args.context->getMacros()->expand(zookeeper_path); + shard_name = args.context->getMacros()->expand(shard_name); + replica_name = args.context->getMacros()->expand(replica_name); + + DatabaseReplicatedSettings database_replicated_settings{}; + if (engine_define->settings) + database_replicated_settings.loadFromQuery(*engine_define); + + return std::make_shared( + args.database_name, + args.metadata_path, + args.uuid, + zookeeper_path, + shard_name, + replica_name, + std::move(database_replicated_settings), args.context); + }; + factory.registerDatabase("Replicated", create_fn); +} } diff --git a/src/Databases/DatabaseS3.cpp b/src/Databases/DatabaseS3.cpp index b92b4a971c1..1721b0e9e97 100644 --- a/src/Databases/DatabaseS3.cpp +++ b/src/Databases/DatabaseS3.cpp @@ -2,6 +2,7 @@ #if USE_AWS_S3 +#include #include #include @@ -307,6 +308,24 @@ DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const Filter return std::make_unique(Tables{}, getDatabaseName()); } -} +void registerDatabaseS3(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + DatabaseS3::Configuration config; + + if (engine->arguments && !engine->arguments->children.empty()) + { + ASTs & engine_args = engine->arguments->children; + config = DatabaseS3::parseArguments(engine_args, args.context); + } + + return std::make_shared(args.database_name, config, args.context); + }; + factory.registerDatabase("S3", create_fn); +} +} #endif diff --git a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp index a31e74cc7ae..46d8cd4da40 100644 --- a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp @@ -2,13 +2,22 @@ #if USE_MYSQL +# include +# include # include # include +# include +# include # include # include # include +# include +# include +# include # include +# include +# include # include # include # include @@ -179,6 +188,95 @@ void DatabaseMaterializedMySQL::stopReplication() started_up = false; } + +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseMaterializedMySQL(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + const String & engine_name = engine_define->engine->name; + + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + StorageMySQL::Configuration configuration; + ASTs & arguments = engine->arguments->children; + auto mysql_settings = std::make_unique(); + + if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context)) + { + configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false); + } + else + { + if (arguments.size() != 4) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments."); + + + arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context); + const auto & host_port = safeGetLiteralValue(arguments[0], engine_name); + + if (engine_name == "MySQL") + { + size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements; + configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); + } + else + { + const auto & [remote_host, remote_port] = parseAddress(host_port, 3306); + configuration.host = remote_host; + configuration.port = remote_port; + } + + configuration.database = safeGetLiteralValue(arguments[1], engine_name); + configuration.username = safeGetLiteralValue(arguments[2], engine_name); + configuration.password = safeGetLiteralValue(arguments[3], engine_name); + } + MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); + auto mysql_pool + = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port); + + auto materialize_mode_settings = std::make_unique(); + + if (engine_define->settings) + materialize_mode_settings->loadFromQuery(*engine_define); + + if (args.uuid == UUIDHelpers::Nil) + { + auto print_create_ast = args.create_query.clone(); + print_create_ast->as()->attach = false; + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete " + "the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}", + args.metadata_path, + queryToString(print_create_ast)); + } + + return make_shared( + args.context, + args.database_name, + args.metadata_path, + args.uuid, + configuration.database, + std::move(mysql_pool), + std::move(client), + std::move(materialize_mode_settings)); + }; + factory.registerDatabase("MySQL", create_fn); +} + } #endif diff --git a/src/Databases/MySQL/DatabaseMySQL.cpp b/src/Databases/MySQL/DatabaseMySQL.cpp index 7d2ed7a9662..2c2cd3370a9 100644 --- a/src/Databases/MySQL/DatabaseMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMySQL.cpp @@ -2,6 +2,7 @@ #if USE_MYSQL # include +# include # include # include # include @@ -14,6 +15,7 @@ # include # include # include +# include # include # include # include @@ -21,8 +23,11 @@ # include # include # include +# include +# include # include # include +# include # include # include # include @@ -41,6 +46,7 @@ namespace ErrorCodes extern const int TABLE_IS_DROPPED; extern const int TABLE_ALREADY_EXISTS; extern const int UNEXPECTED_AST_STRUCTURE; + extern const int CANNOT_CREATE_DATABASE; } constexpr static const auto suffix = ".remove_flag"; @@ -504,6 +510,86 @@ void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_n attachTable(local_context, table_name, storage, {}); } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseMySQL(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + const String & engine_name = engine_define->engine->name; + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + StorageMySQL::Configuration configuration; + ASTs & arguments = engine->arguments->children; + auto mysql_settings = std::make_unique(); + + if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context)) + { + configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false); + } + else + { + if (arguments.size() != 4) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments."); + + + arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context); + const auto & host_port = safeGetLiteralValue(arguments[0], engine_name); + + if (engine_name == "MySQL") + { + size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements; + configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); + } + else + { + const auto & [remote_host, remote_port] = parseAddress(host_port, 3306); + configuration.host = remote_host; + configuration.port = remote_port; + } + + configuration.database = safeGetLiteralValue(arguments[1], engine_name); + configuration.username = safeGetLiteralValue(arguments[2], engine_name); + configuration.password = safeGetLiteralValue(arguments[3], engine_name); + } + mysql_settings->loadFromQueryContext(args.context, *engine_define); + if (engine_define->settings) + mysql_settings->loadFromQuery(*engine_define); + + auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); + + try + { + return make_shared( + args.context, + args.database_name, + args.metadata_path, + engine_define, + configuration.database, + std::move(mysql_settings), + std::move(mysql_pool), + args.create_query.attach); + } + catch (...) + { + const auto & exception_message = getCurrentExceptionMessage(true); + throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message); + } + }; + factory.registerDatabase("MySQL", create_fn); +} } #endif diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index 78be0611631..0de896a8c44 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -8,11 +8,15 @@ #include #include #include +#include +#include #include #include #include #include #include +#include +#include #include #include #include @@ -21,6 +25,8 @@ #include #include #include +#include +#include #include #include #include @@ -471,6 +477,68 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator( return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name); } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + ASTs & engine_args = engine->arguments->children; + const String & engine_name = engine_define->engine->name; + + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + StoragePostgreSQL::Configuration configuration; + + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context)) + { + configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false); + } + else + { + if (engine_args.size() != 4) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`."); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); + + auto parsed_host_port = parseAddress(safeGetLiteralValue(engine_args[0], engine_name), 5432); + + configuration.host = parsed_host_port.first; + configuration.port = parsed_host_port.second; + configuration.database = safeGetLiteralValue(engine_args[1], engine_name); + configuration.username = safeGetLiteralValue(engine_args[2], engine_name); + configuration.password = safeGetLiteralValue(engine_args[3], engine_name); + } + + auto connection_info = postgres::formatConnectionString( + configuration.database, configuration.host, configuration.port, configuration.username, configuration.password); + + auto postgresql_replica_settings = std::make_unique(); + if (engine_define->settings) + postgresql_replica_settings->loadFromQuery(*engine_define); + + return std::make_shared( + args.context, args.metadata_path, args.uuid, args.create_query.attach, + args.database_name, configuration.database, connection_info, + std::move(postgresql_replica_settings)); + }; + factory.registerDatabase("MaterializedPostgreSQL", create_fn); +} } #endif diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp index 24f04c16029..1c573f0f3fb 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp @@ -6,14 +6,18 @@ #include #include +#include #include #include +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -478,6 +482,92 @@ ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) c return std::make_shared(data_type->getName()); } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabasePostgreSQL(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + ASTs & engine_args = engine->arguments->children; + const String & engine_name = engine_define->engine->name; + + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + auto use_table_cache = false; + StoragePostgreSQL::Configuration configuration; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context)) + { + configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false); + use_table_cache = named_collection->getOrDefault("use_table_cache", 0); + } + else + { + if (engine_args.size() < 4 || engine_args.size() > 6) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "PostgreSQL Database require `host:port`, `database_name`, `username`, `password`" + "[, `schema` = "", `use_table_cache` = 0"); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); + + const auto & host_port = safeGetLiteralValue(engine_args[0], engine_name); + size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements; + + configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432); + configuration.database = safeGetLiteralValue(engine_args[1], engine_name); + configuration.username = safeGetLiteralValue(engine_args[2], engine_name); + configuration.password = safeGetLiteralValue(engine_args[3], engine_name); + + bool is_deprecated_syntax = false; + if (engine_args.size() >= 5) + { + auto arg_value = engine_args[4]->as()->value; + if (arg_value.getType() == Field::Types::Which::String) + { + configuration.schema = safeGetLiteralValue(engine_args[4], engine_name); + } + else + { + use_table_cache = safeGetLiteralValue(engine_args[4], engine_name); + LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used"); + is_deprecated_syntax = true; + } + } + + if (!is_deprecated_syntax && engine_args.size() >= 6) + use_table_cache = safeGetLiteralValue(engine_args[5], engine_name); + } + + const auto & settings = args.context->getSettingsRef(); + auto pool = std::make_shared( + configuration, + settings.postgresql_connection_pool_size, + settings.postgresql_connection_pool_wait_timeout, + POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, + settings.postgresql_connection_pool_auto_close_connection); + + return std::make_shared( + args.context, + args.metadata_path, + engine_define, + args.database_name, + configuration, + pool, + use_table_cache); + }; + factory.registerDatabase("PostgreSQL", create_fn); +} } #endif diff --git a/src/Databases/SQLite/DatabaseSQLite.cpp b/src/Databases/SQLite/DatabaseSQLite.cpp index d031fd8e420..1d644a88105 100644 --- a/src/Databases/SQLite/DatabaseSQLite.cpp +++ b/src/Databases/SQLite/DatabaseSQLite.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -201,6 +202,33 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex return create_table_query; } +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + +void registerDatabaseSQLite(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path"); + + const auto & arguments = engine->arguments->children; + + String database_path = safeGetLiteralValue(arguments[0], "SQLite"); + + return std::make_shared(args.context, engine_define, args.create_query.attach, database_path); + }; + factory.registerDatabase("SQLite", create_fn); +} } #endif diff --git a/src/Databases/registerDatabases.cpp b/src/Databases/registerDatabases.cpp index 78f34aadc37..4f7c229bdf4 100644 --- a/src/Databases/registerDatabases.cpp +++ b/src/Databases/registerDatabases.cpp @@ -11,7 +11,30 @@ void registerDatabaseDictionary(DatabaseFactory & factory); void registerDatabaseMemory(DatabaseFactory & factory); void registerDatabaseLazy(DatabaseFactory & factory); void registerDatabaseFilesystem(DatabaseFactory & factory); +void registerDatabaseReplicated(DatabaseFactory & factory); +#if USE_MYSQL +void registerDatabaseMySQL(DatabaseFactory & factory); +void registerDatabaseMaterializedMySQL(DatabaseFactory & factory); +#endif + +#if USE_LIBPQXX +void registerDatabasePostgreSQL(DatabaseFactory & factory); + +void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory); +#endif + +#if USE_SQLITE +void registerDatabaseSQLite(DatabaseFactory & factory); +#endif + +#if USE_AWS_S3 +void registerDatabaseS3(DatabaseFactory & factory); +#endif + +#if USE_HDFS +void registerDatabaseHDFS(DatabaseFactory & factory); +#endif void registerDatabases() { @@ -22,5 +45,28 @@ void registerDatabases() registerDatabaseMemory(factory); registerDatabaseLazy(factory); registerDatabaseFilesystem(factory); + registerDatabaseReplicated(factory); + +#if USE_MYSQL + registerDatabaseMySQL(factory); + registerDatabaseMaterializedMySQL(factory); +#endif + +#if USE_LIBPQXX + registerDatabasePostgreSQL(factory); + registerDatabaseMaterializedPostgreSQL(factory); +#endif + +#if USE_SQLITE + registerDatabaseSQLite(factory); +#endif + +#if USE_AWS_S3 + registerDatabaseS3(factory); +#endif + +#if USE_HDFS + registerDatabaseHDFS(factory); +#endif } } From ab28da85d3ce4d1eea0e9f4a853cc4475150b3f8 Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Fri, 29 Dec 2023 13:52:20 -0800 Subject: [PATCH 3/6] try wire everything up --- programs/server/Server.cpp | 2 + src/Databases/DatabaseFactory.cpp | 420 ++++-------------------------- src/Databases/DatabaseFactory.h | 6 +- 3 files changed, 59 insertions(+), 369 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 481510d681f..926e57070f3 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -72,6 +72,7 @@ #include #include #include +#include #include #include #include @@ -648,6 +649,7 @@ try registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); + registerDatabases(); registerStorages(); registerDictionaries(); registerDisks(/* global_skip_access_check= */ false); diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 5f7e5c673f2..706c6840344 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -121,8 +121,8 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m void DatabaseFactory::registerDatabase(const std::string & name, CreatorFn creator_fn) { - if (!databases.emplace(name, Creator{std::move(creator_fn)}).second) - throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database name '{}' is not unique", name); + if (!database_engines.emplace(name, Creator{std::move(creator_fn)}).second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database engine name '{}' is not unique", name); } DatabaseFactory & DatabaseFactory::instance() @@ -142,372 +142,60 @@ static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &eng DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) { - auto * engine_define = create.storage; + auto * storage = create.storage; const String & database_name = create.getDatabase(); - const String & engine_name = engine_define->engine->name; + const String & engine_name = storage->engine->name; const UUID & uuid = create.uuid; - - static const std::unordered_set database_engines{"Ordinary", "Atomic", "Memory", - "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", - "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; - - if (!database_engines.contains(engine_name)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name); - - static const std::unordered_set engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", - "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; - - static const std::unordered_set engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; - bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); - - if (engine_define->engine->arguments && !engine_may_have_arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); - - bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || - engine_define->primary_key || engine_define->order_by || - engine_define->sample_by; - bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; - - if (has_unexpected_element || (!may_have_settings && engine_define->settings)) - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, - "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); - - if (create.table_overrides && !engines_with_table_overrides.contains(engine_name)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); - - if (engine_name == "Ordinary") - { - if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary) - throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, - "Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)"); - - return std::make_shared(database_name, metadata_path, context); - } - - if (engine_name == "Atomic") - return std::make_shared(database_name, metadata_path, uuid, context); - else if (engine_name == "Memory") - return std::make_shared(database_name, context); - else if (engine_name == "Dictionary") - return std::make_shared(database_name, context); - -#if USE_MYSQL - - else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - StorageMySQL::Configuration configuration; - ASTs & arguments = engine->arguments->children; - auto mysql_settings = std::make_unique(); - - if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context)) - { - configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false); - } - else - { - if (arguments.size() != 4) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments."); - - - arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context); - const auto & host_port = safeGetLiteralValue(arguments[0], engine_name); - - if (engine_name == "MySQL") - { - size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; - configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); - } - else - { - const auto & [remote_host, remote_port] = parseAddress(host_port, 3306); - configuration.host = remote_host; - configuration.port = remote_port; - } - - configuration.database = safeGetLiteralValue(arguments[1], engine_name); - configuration.username = safeGetLiteralValue(arguments[2], engine_name); - configuration.password = safeGetLiteralValue(arguments[3], engine_name); - } - - try - { - if (engine_name == "MySQL") - { - mysql_settings->loadFromQueryContext(context, *engine_define); - if (engine_define->settings) - mysql_settings->loadFromQuery(*engine_define); - - auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); - - return std::make_shared( - context, database_name, metadata_path, engine_define, configuration.database, - std::move(mysql_settings), std::move(mysql_pool), create.attach); - } - - MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); - auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port); - - auto materialize_mode_settings = std::make_unique(); - - if (engine_define->settings) - materialize_mode_settings->loadFromQuery(*engine_define); - - if (uuid == UUIDHelpers::Nil) - { - auto print_create_ast = create.clone(); - print_create_ast->as()->attach = false; - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete " - "the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}", - metadata_path, - queryToString(print_create_ast)); - } - - return std::make_shared( - context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool), - std::move(client), std::move(materialize_mode_settings)); - } - catch (...) - { - const auto & exception_message = getCurrentExceptionMessage(true); - throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message); - } - } -#endif - - else if (engine_name == "Lazy") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument"); - - const auto & arguments = engine->arguments->children; - - const auto cache_expiration_time_seconds = safeGetLiteralValue(arguments[0], "Lazy"); - return std::make_shared(database_name, metadata_path, cache_expiration_time_seconds, context); - } - - else if (engine_name == "Replicated") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 3) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name"); - - auto & arguments = engine->arguments->children; - for (auto & engine_arg : arguments) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - String zookeeper_path = safeGetLiteralValue(arguments[0], "Replicated"); - String shard_name = safeGetLiteralValue(arguments[1], "Replicated"); - String replica_name = safeGetLiteralValue(arguments[2], "Replicated"); - - zookeeper_path = context->getMacros()->expand(zookeeper_path); - shard_name = context->getMacros()->expand(shard_name); - replica_name = context->getMacros()->expand(replica_name); - - DatabaseReplicatedSettings database_replicated_settings{}; - if (engine_define->settings) - database_replicated_settings.loadFromQuery(*engine_define); - - return std::make_shared(database_name, metadata_path, uuid, - zookeeper_path, shard_name, replica_name, - std::move(database_replicated_settings), context); - } - -#if USE_LIBPQXX - - else if (engine_name == "PostgreSQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - ASTs & engine_args = engine->arguments->children; - auto use_table_cache = false; - StoragePostgreSQL::Configuration configuration; - - if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) - { - configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false); - use_table_cache = named_collection->getOrDefault("use_table_cache", 0); - } - else - { - if (engine_args.size() < 4 || engine_args.size() > 6) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "PostgreSQL Database require `host:port`, `database_name`, `username`, `password`" - "[, `schema` = "", `use_table_cache` = 0"); - - for (auto & engine_arg : engine_args) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - const auto & host_port = safeGetLiteralValue(engine_args[0], engine_name); - size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; - - configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432); - configuration.database = safeGetLiteralValue(engine_args[1], engine_name); - configuration.username = safeGetLiteralValue(engine_args[2], engine_name); - configuration.password = safeGetLiteralValue(engine_args[3], engine_name); - - bool is_deprecated_syntax = false; - if (engine_args.size() >= 5) - { - auto arg_value = engine_args[4]->as()->value; - if (arg_value.getType() == Field::Types::Which::String) - { - configuration.schema = safeGetLiteralValue(engine_args[4], engine_name); - } - else - { - use_table_cache = safeGetLiteralValue(engine_args[4], engine_name); - LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used"); - is_deprecated_syntax = true; - } - } - - if (!is_deprecated_syntax && engine_args.size() >= 6) - use_table_cache = safeGetLiteralValue(engine_args[5], engine_name); - } - - const auto & settings = context->getSettingsRef(); - auto pool = std::make_shared( - configuration, - settings.postgresql_connection_pool_size, - settings.postgresql_connection_pool_wait_timeout, - POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, - settings.postgresql_connection_pool_auto_close_connection); - - return std::make_shared( - context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache); - } - else if (engine_name == "MaterializedPostgreSQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - ASTs & engine_args = engine->arguments->children; - StoragePostgreSQL::Configuration configuration; - - if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) - { - configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false); - } - else - { - if (engine_args.size() != 4) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`."); - - for (auto & engine_arg : engine_args) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - auto parsed_host_port = parseAddress(safeGetLiteralValue(engine_args[0], engine_name), 5432); - - configuration.host = parsed_host_port.first; - configuration.port = parsed_host_port.second; - configuration.database = safeGetLiteralValue(engine_args[1], engine_name); - configuration.username = safeGetLiteralValue(engine_args[2], engine_name); - configuration.password = safeGetLiteralValue(engine_args[3], engine_name); - } - - auto connection_info = postgres::formatConnectionString( - configuration.database, configuration.host, configuration.port, configuration.username, configuration.password); - - auto postgresql_replica_settings = std::make_unique(); - if (engine_define->settings) - postgresql_replica_settings->loadFromQuery(*engine_define); - - return std::make_shared( - context, metadata_path, uuid, create.attach, - database_name, configuration.database, connection_info, - std::move(postgresql_replica_settings)); - } - - -#endif - -#if USE_SQLITE - else if (engine_name == "SQLite") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path"); - - const auto & arguments = engine->arguments->children; - - String database_path = safeGetLiteralValue(arguments[0], "SQLite"); - - return std::make_shared(context, engine_define, create.attach, database_path); - } -#endif - - else if (engine_name == "Filesystem") - { - const ASTFunction * engine = engine_define->engine; - - /// If init_path is empty, then the current path will be used - std::string init_path; - - if (engine->arguments && !engine->arguments->children.empty()) - { - if (engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path"); - - const auto & arguments = engine->arguments->children; - init_path = safeGetLiteralValue(arguments[0], engine_name); - } - - return std::make_shared(database_name, init_path, context); - } - -#if USE_AWS_S3 - else if (engine_name == "S3") - { - const ASTFunction * engine = engine_define->engine; - - DatabaseS3::Configuration config; - - if (engine->arguments && !engine->arguments->children.empty()) - { - ASTs & engine_args = engine->arguments->children; - config = DatabaseS3::parseArguments(engine_args, context); - } - - return std::make_shared(database_name, config, context); - } -#endif - -#if USE_HDFS - else if (engine_name == "HDFS") - { - const ASTFunction * engine = engine_define->engine; - - /// If source_url is empty, then table name must contain full url - std::string source_url; - - if (engine->arguments && !engine->arguments->children.empty()) - { - if (engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url"); - - const auto & arguments = engine->arguments->children; - source_url = safeGetLiteralValue(arguments[0], engine_name); - } - - return std::make_shared(database_name, source_url, context); - } -#endif - - throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); + const ASTFunction & engine_def = *storage->engine; + + bool has_engine_args = false; + if (engine_def.arguments) + has_engine_args = true; + + if(!database_engines.contains(engine_name)) + throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); + + ASTs empty_engine_args; + Arguments arguments{ + .engine_name = engine_name, + .engine_args = has_engine_args ? storage->engine->arguments->children : empty_engine_args, + .create_query = create, + .database_name = database_name, + .metadata_path = metadata_path, + .uuid = uuid, + .context = context}; + + assert(arguments.getContext() == arguments.getContext()->getGlobalContext()); + + auto res = database_engines.at(engine_name).creator_fn(arguments); + + return res; + + /// TODO: move validation to respective engines +// static const std::unordered_set database_engines{"Ordinary", "Atomic", "Memory", +// "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", +// "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; + +// static const std::unordered_set engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", +// "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; +// +// static const std::unordered_set engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; +// bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); +// +// if (engine_define->engine->arguments && !engine_may_have_arguments) +// throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); +// +// bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || +// engine_define->primary_key || engine_define->order_by || +// engine_define->sample_by; +// bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; +// +// if (has_unexpected_element || (!may_have_settings && engine_define->settings)) +// throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, +// "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); +// +// if (create.table_overrides && !engines_with_table_overrides.contains(engine_name)) +// throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); } } diff --git a/src/Databases/DatabaseFactory.h b/src/Databases/DatabaseFactory.h index cbc506d691d..2bdfee8fc0f 100644 --- a/src/Databases/DatabaseFactory.h +++ b/src/Databases/DatabaseFactory.h @@ -45,14 +45,14 @@ public: { CreatorFn creator_fn; }; - using Databases = std::unordered_map; + using DatabaseEngines = std::unordered_map; void registerDatabase(const std::string & name, CreatorFn creator_fn); - const Databases & getAllDatabases() const { return databases; } + const DatabaseEngines & getAllDatabases() const { return database_engines; } private: - Databases databases; + DatabaseEngines database_engines; }; } From af534db4d92adc54cb8f46d8b7dce506b25fa6b9 Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Fri, 29 Dec 2023 14:46:13 -0800 Subject: [PATCH 4/6] style fixes and cleanup --- src/Databases/DatabaseFactory.cpp | 19 ++------------ src/Databases/DatabaseFactory.h | 25 ++++++++++++------- src/Databases/DatabaseHDFS.cpp | 9 ------- src/Databases/DatabaseLazy.cpp | 10 +------- src/Databases/DatabaseReplicated.cpp | 9 ------- .../MySQL/DatabaseMaterializedMySQL.cpp | 11 +------- src/Databases/MySQL/DatabaseMySQL.cpp | 10 +------- .../DatabaseMaterializedPostgreSQL.cpp | 10 -------- .../PostgreSQL/DatabasePostgreSQL.cpp | 9 ------- src/Databases/SQLite/DatabaseSQLite.cpp | 10 +------- src/Databases/registerDatabases.h | 2 ++ 11 files changed, 24 insertions(+), 100 deletions(-) diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 706c6840344..1963aee2eed 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -1,12 +1,6 @@ #include #include -#include -#include -#include -#include -#include -#include #include #include #include @@ -67,7 +61,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int UNKNOWN_DATABASE_ENGINE; extern const int CANNOT_CREATE_DATABASE; - extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; } void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path) @@ -131,15 +125,6 @@ DatabaseFactory & DatabaseFactory::instance() return db_fact; } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) { auto * storage = create.storage; @@ -152,7 +137,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String if (engine_def.arguments) has_engine_args = true; - if(!database_engines.contains(engine_name)) + if (!database_engines.contains(engine_name)) throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); ASTs empty_engine_args; diff --git a/src/Databases/DatabaseFactory.h b/src/Databases/DatabaseFactory.h index 2bdfee8fc0f..9f13e854b3f 100644 --- a/src/Databases/DatabaseFactory.h +++ b/src/Databases/DatabaseFactory.h @@ -2,21 +2,28 @@ #include #include - -#if USE_MYSQL -# include -# include -# include -# include -# include -# include -#endif +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + class ASTCreateQuery; +template +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as()->value.safeGet(); +} + class DatabaseFactory : private boost::noncopyable { public: diff --git a/src/Databases/DatabaseHDFS.cpp b/src/Databases/DatabaseHDFS.cpp index 46c642822f6..6810f655116 100644 --- a/src/Databases/DatabaseHDFS.cpp +++ b/src/Databases/DatabaseHDFS.cpp @@ -238,15 +238,6 @@ DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const Filt return std::make_unique(Tables{}, getDatabaseName()); } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseHDFS(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/DatabaseLazy.cpp b/src/Databases/DatabaseLazy.cpp index d6d97f916e0..fcd832e7cc2 100644 --- a/src/Databases/DatabaseLazy.cpp +++ b/src/Databases/DatabaseLazy.cpp @@ -36,6 +36,7 @@ namespace ErrorCodes extern const int UNKNOWN_TABLE; extern const int UNSUPPORTED_METHOD; extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } @@ -356,15 +357,6 @@ const StoragePtr & DatabaseLazyIterator::table() const return current_storage; } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseLazy(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 03b72b18bc8..582b7dbcd19 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -1653,15 +1653,6 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, return true; } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseReplicated(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp index 46d8cd4da40..8b5d1242977 100644 --- a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp @@ -30,6 +30,7 @@ namespace DB namespace ErrorCodes { extern const int NOT_IMPLEMENTED; + extern const int BAD_ARGUMENTS; } DatabaseMaterializedMySQL::DatabaseMaterializedMySQL( @@ -188,16 +189,6 @@ void DatabaseMaterializedMySQL::stopReplication() started_up = false; } - -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseMaterializedMySQL(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/MySQL/DatabaseMySQL.cpp b/src/Databases/MySQL/DatabaseMySQL.cpp index 2c2cd3370a9..96a5c3a18ce 100644 --- a/src/Databases/MySQL/DatabaseMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMySQL.cpp @@ -47,6 +47,7 @@ namespace ErrorCodes extern const int TABLE_ALREADY_EXISTS; extern const int UNEXPECTED_AST_STRUCTURE; extern const int CANNOT_CREATE_DATABASE; + extern const int BAD_ARGUMENTS; } constexpr static const auto suffix = ".remove_flag"; @@ -510,15 +511,6 @@ void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_n attachTable(local_context, table_name, storage, {}); } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseMySQL(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index 0de896a8c44..be92310df5c 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -477,15 +476,6 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator( return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name); } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp index 1c573f0f3fb..1fe5c078581 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp @@ -482,15 +482,6 @@ ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) c return std::make_shared(data_type->getName()); } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabasePostgreSQL(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/SQLite/DatabaseSQLite.cpp b/src/Databases/SQLite/DatabaseSQLite.cpp index 1d644a88105..7a320bad9cf 100644 --- a/src/Databases/SQLite/DatabaseSQLite.cpp +++ b/src/Databases/SQLite/DatabaseSQLite.cpp @@ -22,6 +22,7 @@ namespace ErrorCodes { extern const int SQLITE_ENGINE_ERROR; extern const int UNKNOWN_TABLE; + extern const int BAD_ARGUMENTS; } DatabaseSQLite::DatabaseSQLite( @@ -202,15 +203,6 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex return create_table_query; } -template -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as()->value.safeGet(); -} - void registerDatabaseSQLite(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) diff --git a/src/Databases/registerDatabases.h b/src/Databases/registerDatabases.h index 4af1447e04f..dbf1bbb6e64 100644 --- a/src/Databases/registerDatabases.h +++ b/src/Databases/registerDatabases.h @@ -1,3 +1,5 @@ +#pragma once + namespace DB { void registerDatabases(); From 906745feaa656010b6ec7f13145393e78736fb7b Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Fri, 29 Dec 2023 19:35:00 -0800 Subject: [PATCH 5/6] validation and left over fixes --- programs/copier/ClusterCopierApp.cpp | 2 + programs/format/Format.cpp | 2 + programs/local/LocalServer.cpp | 6 +- src/Databases/DatabaseFactory.cpp | 73 ++++++++++--------- src/Databases/DatabaseFactory.h | 10 +-- .../MySQL/DatabaseMaterializedMySQL.cpp | 5 +- .../DatabaseMaterializedPostgreSQL.cpp | 3 - src/Databases/SQLite/DatabaseSQLite.cpp | 1 - .../fuzzers/execute_query_fuzzer.cpp | 2 + 9 files changed, 52 insertions(+), 52 deletions(-) diff --git a/programs/copier/ClusterCopierApp.cpp b/programs/copier/ClusterCopierApp.cpp index 8f24d13d379..e3371185aad 100644 --- a/programs/copier/ClusterCopierApp.cpp +++ b/programs/copier/ClusterCopierApp.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,7 @@ void ClusterCopierApp::mainImpl() registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); + registerDatabases(); registerStorages(); registerDictionaries(); registerDisks(/* global_skip_access_check= */ true); diff --git a/programs/format/Format.cpp b/programs/format/Format.cpp index 8e7f38b6a1e..05ba86069d7 100644 --- a/programs/format/Format.cpp +++ b/programs/format/Format.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -130,6 +131,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv) registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); + registerDatabases(); registerStorages(); registerFormats(); diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index b425258b2d3..318d70d9139 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -10,18 +10,17 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include #include #include #include -#include #include #include #include @@ -32,11 +31,9 @@ #include #include #include -#include #include #include #include -#include #include #include #include @@ -489,6 +486,7 @@ try registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); + registerDatabases(); registerStorages(); registerDictionaries(); registerDisks(/* global_skip_access_check= */ true); diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 1963aee2eed..efdff92d175 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -97,9 +97,42 @@ void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & me } +/// validate validates the database engine that's specified in the create query for +/// engine arguments, settings and table overrides. +void validate(const ASTCreateQuery & create_query) + +{ + auto * storage = create_query.storage; + + /// Check engine may have arguments + static const std::unordered_set engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", + "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; + + const String & engine_name = storage->engine->name; + bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); + + if (storage->engine->arguments && !engine_may_have_arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); + + /// Check engine may have settings + bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; + bool has_unexpected_element = storage->engine->parameters || storage->partition_by || + storage->primary_key || storage->order_by || + storage->sample_by; + if (has_unexpected_element || (!may_have_settings && storage->settings)) + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, + "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); + + /// Check engine with table overrides + static const std::unordered_set engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; + if (create_query.table_overrides && !engines_with_table_overrides.contains(engine_name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); +} + DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) { cckMetadataPathForOrdinary(create, metadata_path); + validate(create); DatabasePtr impl = getImpl(create, metadata_path, context); @@ -115,7 +148,7 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m void DatabaseFactory::registerDatabase(const std::string & name, CreatorFn creator_fn) { - if (!database_engines.emplace(name, Creator{std::move(creator_fn)}).second) + if (!database_engines.emplace(name, std::move(creator_fn)).second) throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database engine name '{}' is not unique", name); } @@ -125,16 +158,15 @@ DatabaseFactory & DatabaseFactory::instance() return db_fact; } + DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) { auto * storage = create.storage; const String & database_name = create.getDatabase(); const String & engine_name = storage->engine->name; - const UUID & uuid = create.uuid; - const ASTFunction & engine_def = *storage->engine; bool has_engine_args = false; - if (engine_def.arguments) + if (storage->engine->arguments) has_engine_args = true; if (!database_engines.contains(engine_name)) @@ -147,40 +179,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String .create_query = create, .database_name = database_name, .metadata_path = metadata_path, - .uuid = uuid, + .uuid = create.uuid, .context = context}; assert(arguments.getContext() == arguments.getContext()->getGlobalContext()); - auto res = database_engines.at(engine_name).creator_fn(arguments); + // creator_fn creates and returns a DatabasePtr with the supplied arguments + auto creator_fn = database_engines.at(engine_name); - return res; - - /// TODO: move validation to respective engines -// static const std::unordered_set database_engines{"Ordinary", "Atomic", "Memory", -// "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", -// "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; - -// static const std::unordered_set engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", -// "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; -// -// static const std::unordered_set engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; -// bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); -// -// if (engine_define->engine->arguments && !engine_may_have_arguments) -// throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); -// -// bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || -// engine_define->primary_key || engine_define->order_by || -// engine_define->sample_by; -// bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; -// -// if (has_unexpected_element || (!may_have_settings && engine_define->settings)) -// throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, -// "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); -// -// if (create.table_overrides && !engines_with_table_overrides.contains(engine_name)) -// throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); + return creator_fn(arguments); } } diff --git a/src/Databases/DatabaseFactory.h b/src/Databases/DatabaseFactory.h index 9f13e854b3f..34df5f9373c 100644 --- a/src/Databases/DatabaseFactory.h +++ b/src/Databases/DatabaseFactory.h @@ -34,7 +34,7 @@ public: { const String & engine_name; ASTs & engine_args; - ASTStorage * storage_def; + ASTStorage * storage; const ASTCreateQuery & create_query; const String & database_name; const String & metadata_path; @@ -48,16 +48,10 @@ public: using CreatorFn = std::function; - struct Creator - { - CreatorFn creator_fn; - }; - using DatabaseEngines = std::unordered_map; + using DatabaseEngines = std::unordered_map; void registerDatabase(const std::string & name, CreatorFn creator_fn); - const DatabaseEngines & getAllDatabases() const { return database_engines; } - private: DatabaseEngines database_engines; }; diff --git a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp index 8b5d1242977..cbb080a0baa 100644 --- a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp @@ -6,7 +6,6 @@ # include # include -# include # include # include # include @@ -16,7 +15,6 @@ # include # include # include -# include # include # include # include @@ -265,7 +263,8 @@ void registerDatabaseMaterializedMySQL(DatabaseFactory & factory) std::move(client), std::move(materialize_mode_settings)); }; - factory.registerDatabase("MySQL", create_fn); + factory.registerDatabase("MaterializeMySQL", create_fn); + factory.registerDatabase("MaterializedMySQL", create_fn); } } diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index be92310df5c..a659821e179 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -22,14 +22,11 @@ #include #include #include -#include #include #include #include #include #include -#include -#include namespace DB { diff --git a/src/Databases/SQLite/DatabaseSQLite.cpp b/src/Databases/SQLite/DatabaseSQLite.cpp index 7a320bad9cf..605a354bd7e 100644 --- a/src/Databases/SQLite/DatabaseSQLite.cpp +++ b/src/Databases/SQLite/DatabaseSQLite.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include diff --git a/src/Interpreters/fuzzers/execute_query_fuzzer.cpp b/src/Interpreters/fuzzers/execute_query_fuzzer.cpp index 40e2325e46e..fd023754abf 100644 --- a/src/Interpreters/fuzzers/execute_query_fuzzer.cpp +++ b/src/Interpreters/fuzzers/execute_query_fuzzer.cpp @@ -2,6 +2,7 @@ #include #include "Processors/Executors/PullingPipelineExecutor.h" +#include #include #include #include @@ -31,6 +32,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size) registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); + registerDatabases(); registerStorages(); registerDictionaries(); registerDisks(/* global_skip_access_check= */ true); From 6c4a4991e294318db9b3538315ee5ff868a04cbb Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Fri, 29 Dec 2023 21:31:32 -0800 Subject: [PATCH 6/6] few more fixes --- programs/local/LocalServer.cpp | 4 ++++ src/Databases/DatabaseFactory.cpp | 2 -- src/Databases/DatabaseFilesystem.cpp | 23 ++++++++++++++++++----- src/Databases/DatabaseOrdinary.cpp | 5 +++++ 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 318d70d9139..290998029ed 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -17,10 +17,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -31,9 +33,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index efdff92d175..1eada102c35 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -182,8 +182,6 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String .uuid = create.uuid, .context = context}; - assert(arguments.getContext() == arguments.getContext()->getGlobalContext()); - // creator_fn creates and returns a DatabasePtr with the supplied arguments auto creator_fn = database_engines.at(engine_name); diff --git a/src/Databases/DatabaseFilesystem.cpp b/src/Databases/DatabaseFilesystem.cpp index 800f5145e8b..5564f1d07cf 100644 --- a/src/Databases/DatabaseFilesystem.cpp +++ b/src/Databases/DatabaseFilesystem.cpp @@ -242,11 +242,24 @@ void registerDatabaseFilesystem(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) { - return make_shared( - args.database_name, - args.metadata_path, - args.context); + auto * engine_define = args.create_query.storage; + const ASTFunction * engine = engine_define->engine; + const String & engine_name = engine_define->engine->name; + + /// If init_path is empty, then the current path will be used + std::string init_path; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path"); + + const auto & arguments = engine->arguments->children; + init_path = safeGetLiteralValue(arguments[0], engine_name); + } + + return std::make_shared(args.database_name, init_path, args.context); }; - factory.registerDatabase("FileSystem", create_fn); + factory.registerDatabase("Filesystem", create_fn); } } diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 390ff7fd146..8973b533720 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -38,6 +38,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNKNOWN_DATABASE_ENGINE; } static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; @@ -326,6 +327,10 @@ void registerDatabaseOrdinary(DatabaseFactory & factory) { auto create_fn = [](const DatabaseFactory::Arguments & args) { + if (!args.create_query.attach && !args.context->getSettingsRef().allow_deprecated_database_ordinary) + throw Exception( + ErrorCodes::UNKNOWN_DATABASE_ENGINE, + "Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)"); return make_shared( args.database_name, args.metadata_path,