Some other

This commit is contained in:
kssenii 2021-09-03 14:16:32 +03:00
parent 6efbee320b
commit a5f56bb588
26 changed files with 313 additions and 215 deletions

View File

@ -7,7 +7,6 @@
#endif
#include <mysqlxx/Pool.h>
#include <iostream>
#include <common/sleep.h>
@ -37,32 +36,46 @@ void Pool::Entry::decrementRefCount()
mysql_thread_end();
}
Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
const ConnectionConfiguration & configuration,
unsigned default_connections_, unsigned max_connections_,
const char * parent_config_name_)
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
, max_connections(max_connections_)
{
server = cfg.getString(config_name + ".host", configuration.server);
server = cfg.getString(config_name + ".host");
if (parent_config_name_)
{
const std::string parent_config_name(parent_config_name_);
db = cfg.getString(config_name + ".db", cfg.getString(parent_config_name + ".db", ""));
user = cfg.getString(config_name + ".user", cfg.getString(parent_config_name + ".user"));
password = cfg.getString(config_name + ".password", cfg.getString(parent_config_name + ".password"));
user = cfg.has(config_name + ".user")
? cfg.getString(config_name + ".user")
: cfg.getString(parent_config_name + ".user");
password = cfg.has(config_name + ".password")
? cfg.getString(config_name + ".password")
: cfg.getString(parent_config_name + ".password");
if (!cfg.has(config_name + ".port") && !cfg.has(config_name + ".socket")
&& !cfg.has(parent_config_name + ".port") && !cfg.has(parent_config_name + ".socket"))
throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket");
port = cfg.getInt(config_name + ".port", cfg.getInt(parent_config_name + ".port", 0));
socket = cfg.getString(config_name + ".socket", cfg.getString(parent_config_name + ".socket", ""));
ssl_ca = cfg.getString(config_name + ".ssl_ca", cfg.getString(parent_config_name + ".ssl_ca", ""));
ssl_cert = cfg.getString(config_name + ".ssl_cert", cfg.getString(parent_config_name + ".ssl_cert", ""));
ssl_key = cfg.getString(config_name + ".ssl_key", cfg.getString(parent_config_name + ".ssl_key", ""));
port = cfg.has(config_name + ".port")
? cfg.getInt(config_name + ".port")
: cfg.getInt(parent_config_name + ".port", 0);
socket = cfg.has(config_name + ".socket")
? cfg.getString(config_name + ".socket")
: cfg.getString(parent_config_name + ".socket", "");
ssl_ca = cfg.has(config_name + ".ssl_ca")
? cfg.getString(config_name + ".ssl_ca")
: cfg.getString(parent_config_name + ".ssl_ca", "");
ssl_cert = cfg.has(config_name + ".ssl_cert")
? cfg.getString(config_name + ".ssl_cert")
: cfg.getString(parent_config_name + ".ssl_cert", "");
ssl_key = cfg.has(config_name + ".ssl_key")
? cfg.getString(config_name + ".ssl_key")
: cfg.getString(parent_config_name + ".ssl_key", "");
enable_local_infile = cfg.getBool(config_name + ".enable_local_infile",
cfg.getBool(parent_config_name + ".enable_local_infile", MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE));
@ -72,16 +85,15 @@ Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & co
}
else
{
db = cfg.getString(config_name + ".db", configuration.db);
user = cfg.getString(config_name + ".user", configuration.user);
password = cfg.getString(config_name + ".password", configuration.password);
port = cfg.getInt(config_name + ".port", configuration.port);
db = cfg.getString(config_name + ".db", "");
user = cfg.getString(config_name + ".user");
password = cfg.getString(config_name + ".password");
socket = cfg.getString(config_name + ".socket", "");
if (port == 0 && socket.empty())
if (!cfg.has(config_name + ".port") && !cfg.has(config_name + ".socket"))
throw Poco::Exception("mysqlxx::Pool configuration: expected port or socket");
port = cfg.getInt(config_name + ".port", 0);
socket = cfg.getString(config_name + ".socket", "");
ssl_ca = cfg.getString(config_name + ".ssl_ca", "");
ssl_cert = cfg.getString(config_name + ".ssl_cert", "");
ssl_key = cfg.getString(config_name + ".ssl_key", "");
@ -92,8 +104,14 @@ Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & co
opt_reconnect = cfg.getBool(config_name + ".opt_reconnect", MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT);
}
connect_timeout = cfg.getInt(config_name + ".connect_timeout", cfg.getInt("mysql_connect_timeout", connect_timeout));
rw_timeout = cfg.getInt(config_name + ".rw_timeout", cfg.getInt("mysql_rw_timeout", rw_timeout));
connect_timeout = cfg.getInt(config_name + ".connect_timeout",
cfg.getInt("mysql_connect_timeout",
MYSQLXX_DEFAULT_TIMEOUT));
rw_timeout =
cfg.getInt(config_name + ".rw_timeout",
cfg.getInt("mysql_rw_timeout",
MYSQLXX_DEFAULT_RW_TIMEOUT));
}
@ -152,7 +170,6 @@ Pool::Entry Pool::tryGet()
/// Try to pick an idle connection from already allocated
for (auto connection_it = connections.cbegin(); connection_it != connections.cend();)
{
std::cerr << "try get KSSENII \n\n\n";
Connection * connection_ptr = *connection_it;
/// Fixme: There is a race condition here b/c we do not synchronize with Pool::Entry's copy-assignment operator
if (connection_ptr->ref_count == 0)
@ -215,7 +232,6 @@ void Pool::Entry::forceConnected() const
bool first = true;
while (!tryForceConnected())
{
std::cerr << "while loop KSSENII\n\n";
if (first)
first = false;
else

View File

@ -19,23 +19,6 @@
namespace mysqlxx
{
struct ConnectionConfiguration
{
std::string db;
std::string server;
std::string user;
std::string password;
unsigned port;
std::string socket;
unsigned connect_timeout;
unsigned rw_timeout;
std::string ssl_ca;
std::string ssl_cert;
std::string ssl_key;
bool enable_local_infile;
bool opt_reconnect;
};
/** MySQL connections pool.
* This class is poorly connected with mysqlxx and is made in different style (was taken from old code).
* Usage:
@ -152,13 +135,21 @@ public:
void decrementRefCount();
};
Pool(const std::string & config_name,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
const char * parent_config_name_ = nullptr)
: Pool{Poco::Util::Application::instance().config(), config_name,
default_connections_, max_connections_, parent_config_name_}
{}
/**
* @param config_name Setting name in configuration file
* @param default_connections_ Number of default connections
* @param max_connections_ Maximum number of connections
*/
Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
const ConnectionConfiguration & configuration,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
const char * parent_config_name_ = nullptr);
@ -233,6 +224,7 @@ private:
/// Description of connection.
std::string description;
/// Connection settings.
std::string db;
std::string server;
std::string user;

View File

@ -17,11 +17,10 @@ struct PoolFactory::Impl
std::mutex mutex;
};
PoolWithFailover PoolFactory::get(const std::string & config_name,
const ConnectionConfiguration & configuration,
unsigned default_connections, unsigned max_connections, size_t max_tries)
PoolWithFailover PoolFactory::get(const std::string & config_name, unsigned default_connections,
unsigned max_connections, size_t max_tries)
{
return get(Poco::Util::Application::instance().config(), config_name, configuration, default_connections, max_connections, max_tries);
return get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
}
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
@ -73,9 +72,10 @@ static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & co
return entry_name;
}
PoolWithFailover PoolFactory::get(const Poco::Util::AbstractConfiguration & config, const std::string & config_name,
const ConnectionConfiguration & configuration, unsigned default_connections, unsigned max_connections, size_t max_tries)
PoolWithFailover PoolFactory::get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
{
std::lock_guard<std::mutex> lock(impl->mutex);
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
{
@ -92,7 +92,7 @@ PoolWithFailover PoolFactory::get(const Poco::Util::AbstractConfiguration & conf
return *pool;
}
auto pool = std::make_shared<PoolWithFailover>(config, config_name, configuration, default_connections, max_connections, max_tries);
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
// Check the pool will be shared
if (!entry_name.empty())
{

View File

@ -28,7 +28,6 @@ public:
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover get(const std::string & config_name,
const ConnectionConfiguration & configuration,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
@ -36,7 +35,6 @@ public:
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
const ConnectionConfiguration & configuration,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);

View File

@ -17,7 +17,6 @@ using namespace mysqlxx;
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
const ConnectionConfiguration & configuration,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
@ -34,9 +33,11 @@ PoolWithFailover::PoolWithFailover(
if (startsWith(replica_config_key, "replica"))
{
std::string replica_name = config_name_ + "." + replica_config_key;
int priority = config_.getInt(replica_name + ".priority", 0);
replicas_by_priority[priority].emplace_back(
std::make_shared<Pool>(config_, replica_name, configuration, default_connections_, max_connections_, config_name_.c_str()));
std::make_shared<Pool>(config_, replica_name, default_connections_, max_connections_, config_name_.c_str()));
}
}
@ -56,11 +57,22 @@ PoolWithFailover::PoolWithFailover(
else
{
replicas_by_priority[0].emplace_back(
std::make_shared<Pool>(config_, config_name_, configuration, default_connections_, max_connections_));
std::make_shared<Pool>(config_, config_name_, default_connections_, max_connections_));
}
}
PoolWithFailover::PoolWithFailover(
const std::string & config_name_,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
: PoolWithFailover{Poco::Util::Application::instance().config(),
config_name_, default_connections_, max_connections_, max_tries_}
{
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,

View File

@ -97,11 +97,15 @@ namespace mysqlxx
* max_connections Maximum number of connections in pool to each replica.
* max_tries_ Max number of connection tries.
*/
PoolWithFailover(
const std::string & config_name_,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
const ConnectionConfiguration & configuration,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);

View File

@ -136,7 +136,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
{
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = tryGetConfigurationAsNamedCollection(engine_args, context, true);
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context, true);
StorageMySQLConfiguration configuration(common_configuration);
if (with_named_collection)
@ -259,7 +259,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = tryGetConfigurationAsNamedCollection(engine_args, context, true);
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context, true);
StoragePostgreSQLConfiguration configuration(common_configuration);
if (with_named_collection)
@ -313,34 +313,48 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
{
const ASTFunction * engine = engine_define->engine;
ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context, true);
StoragePostgreSQLConfiguration configuration(common_configuration);
if (with_named_collection)
{
if (!storage_specific_args.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database requires only `host`, `port`, `database_name`, `username`, `password`.");
}
}
else
{
if (!engine->arguments || engine->arguments->children.size() != 4)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password`.",
engine_name);
"MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`.");
}
ASTs & engine_args = engine->arguments->children;
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432);
auto parsed_host_port = parseAddress(host_port, 5432);
auto connection_info = postgres::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
}
auto connection_info = postgres::formatConnectionString(
configuration.database, configuration.host, configuration.port, configuration.username, configuration.password);
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define, create.attach,
database_name, postgres_database_name, connection_info,
database_name, configuration.database, connection_info,
std::move(postgresql_replica_settings));
}

View File

@ -2,6 +2,8 @@
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
@ -13,19 +15,20 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & root_config_prefix,
Block & sample_block,
ContextPtr,
ContextPtr context,
const std::string & /* default_database */,
bool /* created_from_ddl */)
{
const auto config_prefix = root_config_prefix + ".mongodb";
auto configuration = getExternalDataSourceConfiguration(config, config_prefix, context);
return std::make_unique<MongoDBDictionarySource>(dict_struct,
config.getString(config_prefix + ".uri", ""),
config.getString(config_prefix + ".host", ""),
config.getUInt(config_prefix + ".port", 0),
config.getString(config_prefix + ".user", ""),
config.getString(config_prefix + ".password", ""),
configuration.host,
configuration.port,
configuration.username,
configuration.password,
config.getString(config_prefix + ".method", ""),
config.getString(config_prefix + ".db", ""),
configuration.database,
config.getString(config_prefix + ".collection"),
sample_block);
};

View File

@ -35,7 +35,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
[[maybe_unused]] Block & sample_block,
[[maybe_unused]] ContextPtr global_context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr {
bool created_from_ddl) -> DictionarySourcePtr {
#if USE_MYSQL
StreamSettings mysql_input_stream_settings(
global_context->getSettingsRef(),
@ -44,7 +44,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);
auto settings_config_prefix = config_prefix + ".mysql";
auto configuration = tryGetConfigurationAsNamedCollection(config, settings_config_prefix, global_context);
auto configuration = getExternalDataSourceConfiguration(config, settings_config_prefix, global_context);
auto query = config.getString(settings_config_prefix + ".query", "");
if (query.empty() && configuration.table.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
@ -61,16 +61,15 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
};
mysqlxx::ConnectionConfiguration mysql_configuration
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
if (created_from_ddl)
{
.db = configuration.database,
.server = configuration.host,
.user = configuration.username,
.password = configuration.password,
.port = configuration.port
};
std::vector<std::pair<String, UInt16>> addresses{std::make_pair(configuration.host, configuration.port)};
pool = std::make_shared<mysqlxx::PoolWithFailover>(configuration.database, addresses, configuration.username, configuration.password);
}
else
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));
auto pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix, mysql_configuration));
return std::make_unique<MySQLDictionarySource>(dict_struct, dictionary_configuration, std::move(pool), sample_block, mysql_input_stream_settings);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,

View File

@ -190,10 +190,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
#if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql";
auto configuration = tryGetConfigurationsByPriorityAsNamedCollection(config, settings_config_prefix, context);
if (configuration.replicas_configurations.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Having no configuration options");
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context);
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
context->getSettingsRef().postgresql_connection_pool_size,

View File

@ -28,7 +28,7 @@ String ExternalDataSourceConfiguration::toString() const
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context, bool is_database_engine)
getExternalDataSourceConfiguration(ASTs args, ContextPtr context, bool is_database_engine)
{
ExternalDataSourceConfiguration configuration;
EngineArgs non_common_args;
@ -98,7 +98,7 @@ tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context, bool is_data
}
ExternalDataSourceConfiguration tryGetConfigurationAsNamedCollection(
ExternalDataSourceConfiguration getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
ExternalDataSourceConfiguration configuration;
@ -141,10 +141,10 @@ ExternalDataSourceConfiguration tryGetConfigurationAsNamedCollection(
}
ExternalDataSourcesByPriority tryGetConfigurationsByPriorityAsNamedCollection(
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
auto common_configuration = tryGetConfigurationAsNamedCollection(dict_config, dict_config_prefix, context);
auto common_configuration = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context);
ExternalDataSourcesByPriority configuration
{
.database = common_configuration.database,

View File

@ -22,9 +22,6 @@ struct ExternalDataSourceConfiguration
String table;
String schema;
ExternalDataSourceConfiguration() = default;
ExternalDataSourceConfiguration(const ExternalDataSourceConfiguration & configuration) = default;
String toString() const;
};
@ -33,8 +30,11 @@ using ExternalDataSourceConfigurationPtr = std::shared_ptr<ExternalDataSourceCon
struct StoragePostgreSQLConfiguration : ExternalDataSourceConfiguration
{
explicit StoragePostgreSQLConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
explicit StoragePostgreSQLConfiguration(
const ExternalDataSourceConfiguration & common_configuration,
const std::vector<std::pair<String, UInt16>> & addresses_ = {})
: ExternalDataSourceConfiguration(common_configuration)
, addresses(addresses_) {}
String on_conflict;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
@ -51,6 +51,15 @@ struct StorageMySQLConfiguration : ExternalDataSourceConfiguration
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
};
struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
{
explicit StorageMongoDBConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
String collection;
String options;
};
using EngineArgs = std::vector<std::pair<String, DB::Field>>;
@ -65,9 +74,9 @@ using EngineArgs = std::vector<std::pair<String, DB::Field>>;
* i.e. storage-specific arguments, then return them back in a set: ExternalDataSource::EngineArgs.
*/
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context, bool is_database_engine = false);
getExternalDataSourceConfiguration(ASTs args, ContextPtr context, bool is_database_engine = false);
ExternalDataSourceConfiguration tryGetConfigurationAsNamedCollection(
ExternalDataSourceConfiguration getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
@ -83,6 +92,6 @@ struct ExternalDataSourcesByPriority
};
ExternalDataSourcesByPriority
tryGetConfigurationsByPriorityAsNamedCollection(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
}

View File

@ -1,10 +1,11 @@
#include "StorageMaterializedPostgreSQL.h"
#if USE_LIBPQXX
#include <common/logger_useful.h>
#include <Common/Macros.h>
#include <Core/Settings.h>
#include <Common/parseAddress.h>
#include <Common/assert_cast.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
@ -20,8 +21,8 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Storages/StorageFactory.h>
#include <common/logger_useful.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h>
#include <Storages/StoragePostgreSQL.h>
#include <Core/PostgreSQL/Connection.h>
@ -455,21 +456,6 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
bool has_settings = args.storage_def->settings;
auto postgresql_replication_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (has_settings)
postgresql_replication_settings->loadFromQuery(*args.storage_def);
if (engine_args.size() != 5)
throw Exception("Storage MaterializedPostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getContext());
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
metadata.setConstraints(args.constraints);
@ -485,20 +471,19 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
else
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->order_by->ptr(), metadata.columns, args.getContext());
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432);
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
/// No connection is made here, see Storages/PostgreSQL/PostgreSQLConnection.cpp
auto configuration = StoragePostgreSQL::getConfiguration(args.engine_args, args.getContext());
auto connection_info = postgres::formatConnectionString(
remote_database,
parsed_host_port.first,
parsed_host_port.second,
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
configuration.database, configuration.host, configuration.port,
configuration.username, configuration.password);
bool has_settings = args.storage_def->settings;
auto postgresql_replication_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (has_settings)
postgresql_replication_settings->loadFromQuery(*args.storage_def);
return StorageMaterializedPostgreSQL::create(
args.table_id, args.attach, remote_database, remote_table, connection_info,
args.table_id, args.attach, configuration.database, configuration.table, connection_info,
metadata, args.getContext(),
std::move(postgresql_replication_settings));
};

View File

@ -15,6 +15,7 @@
#include <Storages/MySQL/MySQLSettings.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/StorageURL.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <common/logger_useful.h>
@ -89,15 +90,20 @@ StorageExternalDistributed::StorageExternalDistributed(
case ExternalStorageEngine::PostgreSQL:
{
// StoragePostgreSQLConfiguration configuration;
// configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
ExternalDataSourceConfiguration configuration
{
.username = username,
.password = password,
.database = remote_database,
};
// auto pool = std::make_shared<postgres::PoolWithFailover>(
// configuration,
// context->getSettingsRef().postgresql_connection_pool_size,
// context->getSettingsRef().postgresql_connection_pool_wait_timeout);
auto pool = std::make_shared<postgres::PoolWithFailover>(
StoragePostgreSQLConfiguration(configuration, addresses),
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
// shard = StoragePostgreSQL::create(table_id_, std::move(pool), remote_table, columns_, constraints_, String{});
shard = StoragePostgreSQL::create(table_id_, std::move(pool), remote_table, columns_, constraints_, String{});
break;
}
#endif
@ -219,7 +225,6 @@ void registerStorageExternalDistributed(StorageFactory & factory)
const String & addresses_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
StorageExternalDistributed::ExternalStorageEngine table_engine;
ExternalDataSourceConfigurationPtr configuration;
if (engine_name == "URL")
{
table_engine = StorageExternalDistributed::ExternalStorageEngine::URL;
@ -246,10 +251,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
{
configuration = std::make_shared<StoragePostgreSQLConfiguration>(StoragePostgreSQL::getConfiguration(engine_args, args.getContext()));
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",

View File

@ -102,42 +102,71 @@ Pipe StorageMongoDB::read(
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true));
}
void registerStorageMongoDB(StorageFactory & factory)
{
factory.registerStorage("MongoDB", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context);
StorageMongoDBConfiguration configuration(common_configuration);
if (with_named_collection)
{
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "collection")
configuration.collection = arg_value.safeGet<String>();
else if (arg_name == "options")
configuration.options = arg_value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unxpected argument name for key-value defined argument."
"Got: {}, but expected one of:"
"host, port, username, password, database, table, options.", arg_name);
}
}
else
{
if (engine_args.size() < 5 || engine_args.size() > 6)
throw Exception(
"Storage MongoDB requires from 5 to 6 parameters: MongoDB('host:port', database, collection, 'user', 'password' [, 'options']).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
/// 27017 is the default MongoDB port.
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 27017);
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & collection = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
configuration.database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.collection = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
configuration.password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
String options;
if (engine_args.size() >= 6)
options = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
}
return configuration;
}
void registerStorageMongoDB(StorageFactory & factory)
{
factory.registerStorage("MongoDB", [](const StorageFactory::Arguments & args)
{
auto configuration = StorageMongoDB::getConfiguration(args.engine_args, args.getContext());
return StorageMongoDB::create(
args.table_id,
parsed_host_port.first,
parsed_host_port.second,
remote_database,
collection,
username,
password,
options,
configuration.host,
configuration.port,
configuration.database,
configuration.collection,
configuration.username,
configuration.password,
configuration.options,
args.columns,
args.constraints,
args.comment);

View File

@ -3,6 +3,7 @@
#include <common/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Poco/MongoDB/Connection.h>
@ -42,6 +43,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
static StorageMongoDBConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
private:
void connectIfNotConnected();

View File

@ -237,7 +237,7 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_)
{
auto [common_configuration, storage_specific_args, with_named_collection] = tryGetConfigurationAsNamedCollection(engine_args, context_);
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context_);
StorageMySQLConfiguration configuration(common_configuration);
if (with_named_collection)

View File

@ -385,7 +385,7 @@ SinkToStoragePtr StoragePostgreSQL::write(
StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_args, ContextPtr context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = tryGetConfigurationAsNamedCollection(engine_args, context);
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context);
StoragePostgreSQLConfiguration configuration(common_configuration);
if (with_named_collection)
@ -416,6 +416,12 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
if (configuration.addresses.size() == 1)
{
configuration.host = configuration.addresses[0].first;
configuration.port = configuration.addresses[0].second;
}
configuration.database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();

View File

@ -46,46 +46,19 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!args_func.arguments)
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.arguments->children;
if (args.size() < 5 || args.size() > 7)
throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
String host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
/// Split into replicas if needed. 3306 is the default MySQL port number
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
pool.emplace(remote_database_name, addresses, user_name, password);
if (args.size() >= 6)
replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
if (args.size() == 7)
on_duplicate_clause = args[6]->as<ASTLiteral &>().value.safeGet<String>();
if (replace_query && !on_duplicate_clause.empty())
throw Exception(
"Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them",
ErrorCodes::BAD_ARGUMENTS);
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context);
pool.emplace(configuration->database, configuration->addresses, configuration->username, configuration->password);
}
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
{
const auto & settings = context->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings, settings.mysql_datatypes_support_level);
const auto tables_and_columns = fetchTablesColumnsList(*pool, configuration->database, {configuration->table}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(remote_table_name);
const auto columns = tables_and_columns.find(configuration->table);
if (columns == tables_and_columns.end())
throw Exception("MySQL table " + (remote_database_name.empty() ? "" : (backQuote(remote_database_name) + "."))
+ backQuote(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + (configuration->database.empty() ? "" : (backQuote(configuration->database) + "."))
+ backQuote(configuration->table) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return columns->second;
}
@ -101,10 +74,10 @@ StoragePtr TableFunctionMySQL::executeImpl(
auto res = StorageMySQL::create(
StorageID(getDatabaseName(), table_name),
std::move(*pool),
remote_database_name,
remote_table_name,
replace_query,
on_duplicate_clause,
configuration->database,
configuration->table,
configuration->replace_query,
configuration->on_duplicate_clause,
columns,
ConstraintsDescription{},
String{},

View File

@ -5,6 +5,7 @@
#if USE_MYSQL
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <mysqlxx/Pool.h>
@ -30,14 +31,8 @@ private:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String remote_database_name;
String remote_table_name;
String user_name;
String password;
bool replace_query = false;
String on_duplicate_clause;
mutable std::optional<mysqlxx::PoolWithFailover> pool;
std::optional<StorageMySQLConfiguration> configuration;
};
}

View File

@ -0,0 +1,19 @@
<yandex>
<named_collections>
<postgres1>
<user>postgres</user>
<password>mysecretpassword</password>
<host>postgres1</host>
<port>5432</port>
<database>postgres_database</database>
<table>test_table</table>
</postgres1>
<user>postgres</user>
<password>mysecretpassword</password>
<host>postgres1</host>
<port>1111</port>
<database>postgres_database</database>
<table>test_table</table>
<postgres2>
</named_collections>
</yandex>

View File

@ -14,7 +14,7 @@ import threading
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs = ['configs/log_conf.xml'],
main_configs = ['configs/log_conf.xml', 'configs/named_collections.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
@ -923,6 +923,18 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (a integer PRIMARY KEY, b integer)')
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1)")
check_tables_are_synchronized("test_table");
drop_materialized_db()
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,12 @@
<yandex>
<named_collections>
<mongo1>
<user>root</user>
<password>clickhouse</password>
<host>mongo1</host>
<port>27017</port>
<database>test</database>
<table>simple_table</table>
</mongo1>
</named_collections>
</yandex>

View File

@ -11,9 +11,9 @@ def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node',
main_configs=["configs_secure/config.d/ssl_conf.xml"],
with_mongo=True,
with_mongo_secure=request.param)
main_configs=["configs_secure/config.d/ssl_conf.xml", 'configs/named_collections.xml'],
with_mongo=True)
#with_mongo_secure=request.param)
cluster.start()
yield cluster
finally:
@ -40,10 +40,10 @@ def test_simple_select(started_cluster):
node = started_cluster.instances['node']
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')")
"create table simple_mongo_table(key uint64, data string) engine = mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')")
assert node.query("SELECT COUNT() FROM simple_mongo_table") == '100\n'
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("select count() from simple_mongo_table") == '100\n'
assert node.query("select sum(key) from simple_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
node.query("DROP TABLE simple_mongo_table")
@ -124,3 +124,17 @@ def test_secure_connection(started_cluster):
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()
def test_predefined_connection_configuration(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection['test']
db.add_user('root', 'clickhouse')
simple_mongo_table = db['simple_table']
data = []
for i in range(0, 100):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query("create table simple_mongo_table(key UInt64, data String) engine = MongoDB(mongo1)")
simple_mongo_table.drop()

View File

@ -370,6 +370,8 @@ def test_predefined_connection_configuration(started_cluster):
''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
assert (node1.query(f"SELECT count() FROM mysql(mysql1)").rstrip() == '100')
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:

View File

@ -252,7 +252,7 @@ def test_postgres_distributed(started_cluster):
node2.query('''
CREATE TABLE test_replicas
(id UInt32, name String)
ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
ENGINE = PostgreSQL('postgres{2|3|4}:5432', 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
# Check all replicas are traversed
query = "SELECT name FROM ("
@ -359,6 +359,7 @@ def test_predefined_connection_configuration(started_cluster):
cursor.execute(f'CREATE TABLE test_table (a integer PRIMARY KEY, b integer)')
node1.query('''
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1);
''')
@ -405,12 +406,13 @@ def test_predefined_connection_configuration(started_cluster):
node1.query("INSERT INTO TABLE FUNCTION postgresql(postgres1, on_conflict='ON CONFLICT DO NOTHING') SELECT number, number from numbers(100)")
assert (node1.query(f"SELECT count() FROM postgresql(postgres1)").rstrip() == '100')
cursor.execute('DROP SCHEMA IF EXISTS test_schema')
cursor.execute('DROP SCHEMA IF EXISTS test_schema CASCADE')
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (a integer)')
node1.query("INSERT INTO TABLE FUNCTION postgresql(postgres1, schema='test_schema', on_conflict='ON CONFLICT DO NOTHING') SELECT number from numbers(200)")
assert (node1.query(f"SELECT count() FROM postgresql(postgres1, schema='test_schema')").rstrip() == '200')
cursor.execute('DROP SCHEMA test_schema CASCADE')
cursor.execute(f'DROP TABLE test_table ')