Merge pull request #58365 from bharatnc/ncb/refactor-db-factory

independent registration of database engines
This commit is contained in:
Alexey Milovidov 2023-12-31 00:56:24 +01:00 committed by GitHub
commit b752a1c62e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 709 additions and 385 deletions

View File

@ -2,6 +2,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <Databases/registerDatabases.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/registerFormats.h>
#include <Common/scope_guard_safe.h>
@ -159,6 +160,7 @@ void ClusterCopierApp::mainImpl()
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);

View File

@ -17,6 +17,7 @@
#include <Interpreters/Context.h>
#include <Functions/FunctionFactory.h>
#include <Databases/registerDatabases.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -130,6 +131,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerFormats();

View File

@ -10,6 +10,7 @@
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Databases/registerDatabases.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesOverlay.h>
@ -489,6 +490,7 @@ try
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);

View File

@ -72,6 +72,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <Databases/registerDatabases.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
@ -648,6 +649,7 @@ try
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ false);

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -622,4 +623,16 @@ void DatabaseAtomic::checkDetachedTableNotInUse(const UUID & uuid)
assertDetachedTableNotInUse(uuid);
}
void registerDatabaseAtomic(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseAtomic>(
args.database_name,
args.metadata_path,
args.uuid,
args.context);
};
factory.registerDatabase("Atomic", create_fn);
}
}

View File

@ -1,4 +1,5 @@
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Dictionaries/DictionaryStructure.h>
@ -140,4 +141,14 @@ void DatabaseDictionary::shutdown()
{
}
void registerDatabaseDictionary(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseDictionary>(
args.database_name,
args.context);
};
factory.registerDatabase("Dictionary", create_fn);
}
}

View File

@ -1,12 +1,6 @@
#include <Databases/DatabaseFactory.h>
#include <filesystem>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -67,7 +61,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int CANNOT_CREATE_DATABASE;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path)
@ -103,9 +97,42 @@ void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & me
}
/// validate validates the database engine that's specified in the create query for
/// engine arguments, settings and table overrides.
void validate(const ASTCreateQuery & create_query)
{
auto * storage = create_query.storage;
/// Check engine may have arguments
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
const String & engine_name = storage->engine->name;
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (storage->engine->arguments && !engine_may_have_arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
/// Check engine may have settings
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
bool has_unexpected_element = storage->engine->parameters || storage->partition_by ||
storage->primary_key || storage->order_by ||
storage->sample_by;
if (has_unexpected_element || (!may_have_settings && storage->settings))
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
/// Check engine with table overrides
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"};
if (create_query.table_overrides && !engines_with_table_overrides.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
}
DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
cckMetadataPathForOrdinary(create, metadata_path);
validate(create);
DatabasePtr impl = getImpl(create, metadata_path, context);
@ -119,383 +146,46 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
return impl;
}
template <typename ValueType>
static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name)
void DatabaseFactory::registerDatabase(const std::string & name, CreatorFn creator_fn)
{
if (!ast || !ast->as<ASTLiteral>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name);
return ast->as<ASTLiteral>()->value.safeGet<ValueType>();
if (!database_engines.emplace(name, std::move(creator_fn)).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DatabaseFactory: the database engine name '{}' is not unique", name);
}
DatabaseFactory & DatabaseFactory::instance()
{
static DatabaseFactory db_fact;
return db_fact;
}
DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
auto * engine_define = create.storage;
auto * storage = create.storage;
const String & database_name = create.getDatabase();
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
const String & engine_name = storage->engine->name;
static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL",
"PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
bool has_engine_args = false;
if (storage->engine->arguments)
has_engine_args = true;
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"};
static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
if (create.table_overrides && !engines_with_table_overrides.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
if (engine_name == "Ordinary")
{
if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
}
if (engine_name == "Atomic")
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name, context);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name, context);
#if USE_MYSQL
else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
try
{
if (engine_name == "MySQL")
{
mysql_settings->loadFromQueryContext(context, *engine_define);
if (engine_define->settings)
mysql_settings->loadFromQuery(*engine_define);
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, configuration.database,
std::move(mysql_settings), std::move(mysql_pool), create.attach);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port);
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (uuid == UUIDHelpers::Nil)
{
auto print_create_ast = create.clone();
print_create_ast->as<ASTCreateQuery>()->attach = false;
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete "
"the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}",
metadata_path,
queryToString(print_create_ast));
}
return std::make_shared<DatabaseMaterializedMySQL>(
context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool),
std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message);
}
}
#endif
else if (engine_name == "Lazy")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument");
const auto & arguments = engine->arguments->children;
const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy");
return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context);
}
else if (engine_name == "Replicated")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 3)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name");
auto & arguments = engine->arguments->children;
for (auto & engine_arg : arguments)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated");
String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated");
zookeeper_path = context->getMacros()->expand(zookeeper_path);
shard_name = context->getMacros()->expand(shard_name);
replica_name = context->getMacros()->expand(replica_name);
DatabaseReplicatedSettings database_replicated_settings{};
if (engine_define->settings)
database_replicated_settings.loadFromQuery(*engine_define);
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid,
zookeeper_path, shard_name, replica_name,
std::move(database_replicated_settings), context);
}
#if USE_LIBPQXX
else if (engine_name == "PostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
auto use_table_cache = false;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false);
use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0);
}
else
{
if (engine_args.size() < 4 || engine_args.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
bool is_deprecated_syntax = false;
if (engine_args.size() >= 5)
{
auto arg_value = engine_args[4]->as<ASTLiteral>()->value;
if (arg_value.getType() == Field::Types::Which::String)
{
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
else
{
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name);
LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used");
is_deprecated_syntax = true;
}
}
if (!is_deprecated_syntax && engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
}
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);
}
else if (engine_name == "MaterializedPostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, context, false);
}
else
{
if (engine_args.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, create.attach,
database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
}
#endif
#if USE_SQLITE
else if (engine_name == "SQLite")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path");
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(context, engine_define, create.attach, database_path);
}
#endif
else if (engine_name == "Filesystem")
{
const ASTFunction * engine = engine_define->engine;
/// If init_path is empty, then the current path will be used
std::string init_path;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path");
const auto & arguments = engine->arguments->children;
init_path = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseFilesystem>(database_name, init_path, context);
}
#if USE_AWS_S3
else if (engine_name == "S3")
{
const ASTFunction * engine = engine_define->engine;
DatabaseS3::Configuration config;
if (engine->arguments && !engine->arguments->children.empty())
{
ASTs & engine_args = engine->arguments->children;
config = DatabaseS3::parseArguments(engine_args, context);
}
return std::make_shared<DatabaseS3>(database_name, config, context);
}
#endif
#if USE_HDFS
else if (engine_name == "HDFS")
{
const ASTFunction * engine = engine_define->engine;
/// If source_url is empty, then table name must contain full url
std::string source_url;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url");
const auto & arguments = engine->arguments->children;
source_url = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseHDFS>(database_name, source_url, context);
}
#endif
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name);
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name);
ASTs empty_engine_args;
Arguments arguments{
.engine_name = engine_name,
.engine_args = has_engine_args ? storage->engine->arguments->children : empty_engine_args,
.create_query = create,
.database_name = database_name,
.metadata_path = metadata_path,
.uuid = create.uuid,
.context = context};
// creator_fn creates and returns a DatabasePtr with the supplied arguments
auto creator_fn = database_engines.at(engine_name);
return creator_fn(arguments);
}
}

View File

@ -2,18 +2,58 @@
#include <Interpreters/Context_fwd.h>
#include <Databases/IDatabase.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class ASTCreateQuery;
class DatabaseFactory
template <typename ValueType>
static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name)
{
if (!ast || !ast->as<ASTLiteral>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name);
return ast->as<ASTLiteral>()->value.safeGet<ValueType>();
}
class DatabaseFactory : private boost::noncopyable
{
public:
static DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
static DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
static DatabaseFactory & instance();
struct Arguments
{
const String & engine_name;
ASTs & engine_args;
ASTStorage * storage;
const ASTCreateQuery & create_query;
const String & database_name;
const String & metadata_path;
const UUID & uuid;
ContextPtr & context;
};
DatabasePtr get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
DatabasePtr getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context);
using CreatorFn = std::function<DatabasePtr(const Arguments & arguments)>;
using DatabaseEngines = std::unordered_map<std::string, CreatorFn>;
void registerDatabase(const std::string & name, CreatorFn creator_fn);
private:
DatabaseEngines database_engines;
};
}

View File

@ -1,3 +1,4 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseFilesystem.h>
#include <IO/Operators.h>
@ -237,4 +238,28 @@ DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, cons
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
void registerDatabaseFilesystem(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
/// If init_path is empty, then the current path will be used
std::string init_path;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path");
const auto & arguments = engine->arguments->children;
init_path = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseFilesystem>(args.database_name, init_path, args.context);
};
factory.registerDatabase("Filesystem", create_fn);
}
}

View File

@ -2,6 +2,7 @@
#if USE_HDFS
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseHDFS.h>
#include <Interpreters/Context.h>
@ -237,6 +238,30 @@ DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const Filt
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
void registerDatabaseHDFS(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
/// If source_url is empty, then table name must contain full url
std::string source_url;
if (engine->arguments && !engine->arguments->children.empty())
{
if (engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url");
const auto & arguments = engine->arguments->children;
source_url = safeGetLiteralValue<String>(arguments[0], engine_name);
}
return std::make_shared<DatabaseHDFS>(args.database_name, source_url, args.context);
};
factory.registerDatabase("HDFS", create_fn);
}
} // DB
#endif

View File

@ -1,4 +1,5 @@
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabasesCommon.h>
@ -7,6 +8,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
@ -34,6 +36,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -354,4 +357,26 @@ const StoragePtr & DatabaseLazyIterator::table() const
return current_storage;
}
void registerDatabaseLazy(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument");
const auto & arguments = engine->arguments->children;
const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy");
return make_shared<DatabaseLazy>(
args.database_name,
args.metadata_path,
cache_expiration_time_seconds,
args.context);
};
factory.registerDatabase("Lazy", create_fn);
}
}

View File

@ -1,5 +1,6 @@
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/DDLDependencyVisitor.h>
@ -209,4 +210,15 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseMemory::getTablesForBackup(co
return res;
}
void registerDatabaseMemory(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
return make_shared<DatabaseMemory>(
args.database_name,
args.context);
};
factory.registerDatabase("Memory", create_fn);
}
}

View File

@ -1,6 +1,7 @@
#include <filesystem>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
@ -37,6 +38,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_DATABASE_ENGINE;
}
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
@ -321,4 +323,19 @@ void DatabaseOrdinary::commitAlterTable(const StorageID &, const String & table_
}
}
void registerDatabaseOrdinary(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
if (!args.create_query.attach && !args.context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(
ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return make_shared<DatabaseOrdinary>(
args.database_name,
args.metadata_path,
args.context);
};
factory.registerDatabase("Ordinary", create_fn);
}
}

View File

@ -13,6 +13,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/PoolId.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
@ -1652,4 +1653,41 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
return true;
}
void registerDatabaseReplicated(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 3)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name");
auto & arguments = engine->arguments->children;
for (auto & engine_arg : arguments)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated");
String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated");
zookeeper_path = args.context->getMacros()->expand(zookeeper_path);
shard_name = args.context->getMacros()->expand(shard_name);
replica_name = args.context->getMacros()->expand(replica_name);
DatabaseReplicatedSettings database_replicated_settings{};
if (engine_define->settings)
database_replicated_settings.loadFromQuery(*engine_define);
return std::make_shared<DatabaseReplicated>(
args.database_name,
args.metadata_path,
args.uuid,
zookeeper_path,
shard_name,
replica_name,
std::move(database_replicated_settings), args.context);
};
factory.registerDatabase("Replicated", create_fn);
}
}

View File

@ -2,6 +2,7 @@
#if USE_AWS_S3
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseS3.h>
#include <Interpreters/Context.h>
@ -307,6 +308,24 @@ DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const Filter
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
}
void registerDatabaseS3(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
DatabaseS3::Configuration config;
if (engine->arguments && !engine->arguments->children.empty())
{
ASTs & engine_args = engine->arguments->children;
config = DatabaseS3::parseArguments(engine_args, args.context);
}
return std::make_shared<DatabaseS3>(args.database_name, config, args.context);
};
factory.registerDatabase("S3", create_fn);
}
}
#endif

View File

@ -2,13 +2,20 @@
#if USE_MYSQL
# include <Common/parseAddress.h>
# include <Common/parseRemoteDescription.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Databases/DatabaseFactory.h>
# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/queryToString.h>
# include <Storages/StorageMySQL.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Common/setThreadName.h>
# include <Common/PoolId.h>
# include <filesystem>
@ -21,6 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
}
DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
@ -179,6 +187,86 @@ void DatabaseMaterializedMySQL::stopReplication()
started_up = false;
}
void registerDatabaseMaterializedMySQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
auto mysql_pool
= mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port);
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (args.uuid == UUIDHelpers::Nil)
{
auto print_create_ast = args.create_query.clone();
print_create_ast->as<ASTCreateQuery>()->attach = false;
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete "
"the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}",
args.metadata_path,
queryToString(print_create_ast));
}
return make_shared<DatabaseMaterializedMySQL>(
args.context,
args.database_name,
args.metadata_path,
args.uuid,
configuration.database,
std::move(mysql_pool),
std::move(client),
std::move(materialize_mode_settings));
};
factory.registerDatabase("MaterializeMySQL", create_fn);
factory.registerDatabase("MaterializedMySQL", create_fn);
}
}
#endif

View File

@ -2,6 +2,7 @@
#if USE_MYSQL
# include <string>
# include <Databases/DatabaseFactory.h>
# include <DataTypes/DataTypeDateTime.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
@ -14,6 +15,7 @@
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ParserCreateQuery.h>
@ -21,8 +23,11 @@
# include <Parsers/queryToString.h>
# include <Storages/StorageMySQL.h>
# include <Storages/MySQL/MySQLSettings.h>
# include <Storages/MySQL/MySQLHelpers.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Common/escapeForFileName.h>
# include <Common/parseAddress.h>
# include <Common/parseRemoteDescription.h>
# include <Common/setThreadName.h>
# include <filesystem>
# include <Common/filesystemHelpers.h>
@ -41,6 +46,8 @@ namespace ErrorCodes
extern const int TABLE_IS_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int CANNOT_CREATE_DATABASE;
extern const int BAD_ARGUMENTS;
}
constexpr static const auto suffix = ".remove_flag";
@ -504,6 +511,77 @@ void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_n
attachTable(local_context, table_name, storage, {});
}
void registerDatabaseMySQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StorageMySQL::Configuration configuration;
ASTs & arguments = engine->arguments->children;
auto mysql_settings = std::make_unique<MySQLSettings>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, args.context))
{
configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, args.context, false);
}
else
{
if (arguments.size() != 4)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], args.context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
{
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
}
else
{
const auto & [remote_host, remote_port] = parseAddress(host_port, 3306);
configuration.host = remote_host;
configuration.port = remote_port;
}
configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name);
configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name);
configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name);
}
mysql_settings->loadFromQueryContext(args.context, *engine_define);
if (engine_define->settings)
mysql_settings->loadFromQuery(*engine_define);
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
try
{
return make_shared<DatabaseMySQL>(
args.context,
args.database_name,
args.metadata_path,
engine_define,
configuration.database,
std::move(mysql_settings),
std::move(mysql_pool),
args.create_query.attach);
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message);
}
};
factory.registerDatabase("MySQL", create_fn);
}
}
#endif

View File

@ -8,23 +8,25 @@
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/PoolId.h>
#include <Common/parseAddress.h>
#include <Common/parseRemoteDescription.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseFactory.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
namespace DB
{
@ -471,6 +473,59 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
}
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
StoragePostgreSQL::Configuration configuration;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false);
}
else
{
if (engine_args.size() != 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
args.context, args.metadata_path, args.uuid, args.create_query.attach,
args.database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
};
factory.registerDatabase("MaterializedPostgreSQL", create_fn);
}
}
#endif

View File

@ -6,14 +6,18 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StoragePostgreSQL.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Common/escapeForFileName.h>
#include <Common/parseRemoteDescription.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Common/quoteString.h>
#include <Common/filesystemHelpers.h>
@ -478,6 +482,83 @@ ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) c
return std::make_shared<ASTIdentifier>(data_type->getName());
}
void registerDatabasePostgreSQL(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
const String & engine_name = engine_define->engine->name;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
auto use_table_cache = false;
StoragePostgreSQL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.context))
{
configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, args.context, false);
use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0);
}
else
{
if (engine_args.size() < 4 || engine_args.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
size_t max_addresses = args.context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
bool is_deprecated_syntax = false;
if (engine_args.size() >= 5)
{
auto arg_value = engine_args[4]->as<ASTLiteral>()->value;
if (arg_value.getType() == Field::Types::Which::String)
{
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
else
{
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name);
LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used");
is_deprecated_syntax = true;
}
}
if (!is_deprecated_syntax && engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
}
const auto & settings = args.context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
args.context,
args.metadata_path,
engine_define,
args.database_name,
configuration,
pool,
use_table_cache);
};
factory.registerDatabase("PostgreSQL", create_fn);
}
}
#endif

View File

@ -5,11 +5,11 @@
#include <Common/logger_useful.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/SQLite/fetchSQLiteTableStructure.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/Context.h>
#include <Storages/StorageSQLite.h>
#include <Databases/SQLite/SQLiteUtils.h>
@ -21,6 +21,7 @@ namespace ErrorCodes
{
extern const int SQLITE_ENGINE_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
DatabaseSQLite::DatabaseSQLite(
@ -201,6 +202,24 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
return create_table_query;
}
void registerDatabaseSQLite(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
auto * engine_define = args.create_query.storage;
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path");
const auto & arguments = engine->arguments->children;
String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite");
return std::make_shared<DatabaseSQLite>(args.context, engine_define, args.create_query.attach, database_path);
};
factory.registerDatabase("SQLite", create_fn);
}
}
#endif

View File

@ -0,0 +1,72 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/registerDatabases.h>
namespace DB
{
void registerDatabaseAtomic(DatabaseFactory & factory);
void registerDatabaseOrdinary(DatabaseFactory & factory);
void registerDatabaseDictionary(DatabaseFactory & factory);
void registerDatabaseMemory(DatabaseFactory & factory);
void registerDatabaseLazy(DatabaseFactory & factory);
void registerDatabaseFilesystem(DatabaseFactory & factory);
void registerDatabaseReplicated(DatabaseFactory & factory);
#if USE_MYSQL
void registerDatabaseMySQL(DatabaseFactory & factory);
void registerDatabaseMaterializedMySQL(DatabaseFactory & factory);
#endif
#if USE_LIBPQXX
void registerDatabasePostgreSQL(DatabaseFactory & factory);
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory);
#endif
#if USE_SQLITE
void registerDatabaseSQLite(DatabaseFactory & factory);
#endif
#if USE_AWS_S3
void registerDatabaseS3(DatabaseFactory & factory);
#endif
#if USE_HDFS
void registerDatabaseHDFS(DatabaseFactory & factory);
#endif
void registerDatabases()
{
auto & factory = DatabaseFactory::instance();
registerDatabaseAtomic(factory);
registerDatabaseOrdinary(factory);
registerDatabaseDictionary(factory);
registerDatabaseMemory(factory);
registerDatabaseLazy(factory);
registerDatabaseFilesystem(factory);
registerDatabaseReplicated(factory);
#if USE_MYSQL
registerDatabaseMySQL(factory);
registerDatabaseMaterializedMySQL(factory);
#endif
#if USE_LIBPQXX
registerDatabasePostgreSQL(factory);
registerDatabaseMaterializedPostgreSQL(factory);
#endif
#if USE_SQLITE
registerDatabaseSQLite(factory);
#endif
#if USE_AWS_S3
registerDatabaseS3(factory);
#endif
#if USE_HDFS
registerDatabaseHDFS(factory);
#endif
}
}

View File

@ -0,0 +1,6 @@
#pragma once
namespace DB
{
void registerDatabases();
}

View File

@ -282,7 +282,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
else if (create.uuid != UUIDHelpers::Nil && !DatabaseCatalog::instance().hasUUIDMapping(create.uuid))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find UUID mapping for {}, it's a bug", create.uuid);
DatabasePtr database = DatabaseFactory::get(create, metadata_path / "", getContext());
DatabasePtr database = DatabaseFactory::instance().get(create, metadata_path / "", getContext());
if (create.uuid != UUIDHelpers::Nil)
create.setDatabase(TABLE_WITH_UUID_NAME_PLACEHOLDER);

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h>
#include "Processors/Executors/PullingPipelineExecutor.h"
#include <Functions/registerDatabases.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -31,6 +32,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerDatabases();
registerStorages();
registerDictionaries();
registerDisks(/* global_skip_access_check= */ true);