Complete postgres

This commit is contained in:
kssenii 2021-09-02 02:17:15 +03:00
parent 3423f8c984
commit 7a45775f4f
15 changed files with 328 additions and 136 deletions

View File

@ -20,7 +20,7 @@ namespace postgres
{ {
PoolWithFailover::PoolWithFailover( PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_) size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_) : pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_) , max_tries(max_tries_)
@ -28,45 +28,19 @@ PoolWithFailover::PoolWithFailover(
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}", LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_); pool_size, pool_wait_timeout, max_tries_);
auto db = config.getString(config_prefix + ".db", ""); for (const auto & [priority, configurations] : configurations_by_priority)
auto host = config.getString(config_prefix + ".host", "");
auto port = config.getUInt(config_prefix + ".port", 0);
auto user = config.getString(config_prefix + ".user", "");
auto password = config.getString(config_prefix + ".password", "");
if (config.has(config_prefix + ".replica"))
{ {
Poco::Util::AbstractConfiguration::Keys config_keys; for (const auto & replica_configuration : configurations)
config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{ {
if (config_key.starts_with("replica")) auto connection_string = formatConnectionString(replica_configuration.database,
{ replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password).first;
std::string replica_name = config_prefix + "." + config_key; replicas_with_priority[priority].emplace_back(connection_string, pool_size, getConnectionForLog(replica_configuration.host, replica_configuration.port));
size_t priority = config.getInt(replica_name + ".priority", 0);
auto replica_host = config.getString(replica_name + ".host", host);
auto replica_port = config.getUInt(replica_name + ".port", port);
auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password);
auto connection_string = formatConnectionString(db, replica_host, replica_port, replica_user, replica_password).first;
replicas_with_priority[priority].emplace_back(connection_string, pool_size, getConnectionForLog(replica_host, replica_port));
}
} }
} }
else
{
auto connection_string = formatConnectionString(db, host, port, user, password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(host, port));
}
} }
PoolWithFailover::PoolWithFailover( PoolWithFailover::PoolWithFailover(
const std::string & database, const DB::StoragePostgreSQLConfiguration & configuration,
const RemoteDescription & addresses,
const std::string & user, const std::string & password,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_) size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_) : pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_) , max_tries(max_tries_)
@ -75,11 +49,11 @@ PoolWithFailover::PoolWithFailover(
pool_size, pool_wait_timeout, max_tries_); pool_size, pool_wait_timeout, max_tries_);
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue. /// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses) for (const auto & [host, port] : configuration.addresses)
{ {
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port); LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
auto connection_string = formatConnectionString(database, host, port, user, password).first; auto connection_string = formatConnectionString(configuration.database, configuration.host, configuration.port, configuration.username, configuration.password).first;
replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(host, port)); replicas_with_priority[0].emplace_back(connection_string, pool_size, getConnectionForLog(configuration.host, configuration.port));
} }
} }
@ -118,7 +92,7 @@ ConnectionHolderPtr PoolWithFailover::get()
catch (const pqxx::broken_connection & pqxx_error) catch (const pqxx::broken_connection & pqxx_error)
{ {
LOG_ERROR(log, "Connection error: {}", pqxx_error.what()); LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << "Try " << try_idx << ". Connection to `" << replica.name_for_log << "` failed: " << pqxx_error.what() << "\n"; error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.name_for_log << "` failed: " << pqxx_error.what() << "\n";
replica.pool->returnObject(std::move(connection)); replica.pool->returnObject(std::move(connection));
continue; continue;

View File

@ -11,6 +11,7 @@
#include <mutex> #include <mutex>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace postgres namespace postgres
@ -27,17 +28,13 @@ public:
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5; static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover( PoolWithFailover(
const Poco::Util::AbstractConfiguration & config, const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
const std::string & config_prefix,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover( PoolWithFailover(
const std::string & database, const DB::StoragePostgreSQLConfiguration & configuration,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);

View File

@ -38,6 +38,7 @@
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE #include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h> #include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h> #include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#include <Storages/StoragePostgreSQL.h>
#endif #endif
#if USE_SQLITE #if USE_SQLITE
@ -236,43 +237,56 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
{ {
const ASTFunction * engine = engine_define->engine; const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 6)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} Database require `host:port`, `database_name`, `username`, `password` [, `schema` = "", `use_table_cache` = 0].",
engine_name);
ASTs & engine_args = engine->arguments->children; ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = tryGetConfigurationAsNamedCollection(engine_args, context, true);
StoragePostgreSQLConfiguration configuration(common_configuration);
for (auto & engine_arg : engine_args) if (with_named_collection)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); {
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "on_conflict")
configuration.on_conflict = arg_value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unxpected argument name for key-value defined argument."
"Got: {}, but expected one of:"
"host, port, username, password, database, schema, on_conflict, use_table_cache.", arg_name);
}
}
else
{
if (!engine->arguments || engine->arguments->children.size() < 4 || engine->arguments->children.size() > 7)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PostgreSQL Database require `host:port`, `database_name`, `username`, `password`"
"[, `schema` = "", `use_table_cache` = 0, on_conflict='ON CONFLICT TO NOTHING'");
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name); for (auto & engine_arg : engine_args)
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name); engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
String schema; const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
if (engine->arguments->children.size() >= 5) size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name);
configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name);
configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name);
if (engine_args.size() >= 5)
configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name);
}
auto use_table_cache = 0; auto use_table_cache = 0;
if (engine->arguments->children.size() >= 6) if (engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name); use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
/// Split into replicas if needed. auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432);
/// no connection is made here
auto connection_pool = std::make_shared<postgres::PoolWithFailover>(
postgres_database_name,
addresses,
username, password,
context->getSettingsRef().postgresql_connection_pool_size, context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout); context->getSettingsRef().postgresql_connection_pool_wait_timeout);
return std::make_shared<DatabasePostgreSQL>( return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, schema, connection_pool, use_table_cache); context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);
} }
else if (engine_name == "MaterializedPostgreSQL") else if (engine_name == "MaterializedPostgreSQL")
{ {

View File

@ -39,16 +39,14 @@ DatabasePostgreSQL::DatabasePostgreSQL(
const String & metadata_path_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const ASTStorage * database_engine_define_,
const String & dbname_, const String & dbname_,
const String & postgres_dbname_, const StoragePostgreSQLConfiguration & configuration_,
const String & postgres_schema_,
postgres::PoolWithFailoverPtr pool_, postgres::PoolWithFailoverPtr pool_,
bool cache_tables_) bool cache_tables_)
: IDatabase(dbname_) : IDatabase(dbname_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, metadata_path(metadata_path_) , metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone()) , database_engine_define(database_engine_define_->clone())
, postgres_dbname(postgres_dbname_) , configuration(configuration_)
, postgres_schema(postgres_schema_)
, pool(std::move(pool_)) , pool(std::move(pool_))
, cache_tables(cache_tables_) , cache_tables(cache_tables_)
{ {
@ -59,17 +57,17 @@ DatabasePostgreSQL::DatabasePostgreSQL(
String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const
{ {
if (postgres_schema.empty()) if (configuration.schema.empty())
return fmt::format("{}.{}", postgres_dbname, table_name); return fmt::format("{}.{}", configuration.database, table_name);
return fmt::format("{}.{}.{}", postgres_dbname, postgres_schema, table_name); return fmt::format("{}.{}.{}", configuration.database, configuration.schema, table_name);
} }
String DatabasePostgreSQL::formatTableName(const String & table_name) const String DatabasePostgreSQL::formatTableName(const String & table_name) const
{ {
if (postgres_schema.empty()) if (configuration.schema.empty())
return doubleQuoteString(table_name); return doubleQuoteString(table_name);
return fmt::format("{}.{}", doubleQuoteString(postgres_schema), doubleQuoteString(table_name)); return fmt::format("{}.{}", doubleQuoteString(configuration.schema), doubleQuoteString(table_name));
} }
@ -78,7 +76,7 @@ bool DatabasePostgreSQL::empty() const
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
auto connection_holder = pool->get(); auto connection_holder = pool->get();
auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema); auto tables_list = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
for (const auto & table_name : tables_list) for (const auto & table_name : tables_list)
if (!detached_or_dropped.count(table_name)) if (!detached_or_dropped.count(table_name))
@ -94,7 +92,7 @@ DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local
Tables tables; Tables tables;
auto connection_holder = pool->get(); auto connection_holder = pool->get();
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema); auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
for (const auto & table_name : table_names) for (const auto & table_name : table_names)
if (!detached_or_dropped.count(table_name)) if (!detached_or_dropped.count(table_name))
@ -125,7 +123,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
"WHERE schemaname != 'pg_catalog' AND {} " "WHERE schemaname != 'pg_catalog' AND {} "
"AND tablename = '{}'", "AND tablename = '{}'",
formatTableName(table_name), formatTableName(table_name),
(postgres_schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(postgres_schema)), (configuration.schema.empty() ? "schemaname != 'information_schema'" : "schemaname = " + quoteString(configuration.schema)),
formatTableName(table_name))); formatTableName(table_name)));
} }
catch (pqxx::undefined_table const &) catch (pqxx::undefined_table const &)
@ -179,7 +177,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
auto storage = StoragePostgreSQL::create( auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), pool, table_name, StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, postgres_schema); ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
if (cache_tables) if (cache_tables)
cached_tables[table_name] = storage; cached_tables[table_name] = storage;
@ -306,7 +304,7 @@ void DatabasePostgreSQL::removeOutdatedTables()
{ {
std::lock_guard<std::mutex> lock{mutex}; std::lock_guard<std::mutex> lock{mutex};
auto connection_holder = pool->get(); auto connection_holder = pool->get();
auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), postgres_schema); auto actual_tables = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
if (cache_tables) if (cache_tables)
{ {

View File

@ -10,7 +10,7 @@
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Core/PostgreSQL/PoolWithFailover.h> #include <Core/PostgreSQL/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB namespace DB
{ {
@ -32,8 +32,7 @@ public:
const String & metadata_path_, const String & metadata_path_,
const ASTStorage * database_engine_define, const ASTStorage * database_engine_define,
const String & dbname_, const String & dbname_,
const String & postgres_dbname_, const StoragePostgreSQLConfiguration & configuration,
const String & postgres_schema_,
postgres::PoolWithFailoverPtr pool_, postgres::PoolWithFailoverPtr pool_,
bool cache_tables_); bool cache_tables_);
@ -70,8 +69,7 @@ protected:
private: private:
String metadata_path; String metadata_path;
ASTPtr database_engine_define; ASTPtr database_engine_define;
String postgres_dbname; StoragePostgreSQLConfiguration configuration;
String postgres_schema;
postgres::PoolWithFailoverPtr pool; postgres::PoolWithFailoverPtr pool;
const bool cache_tables; const bool cache_tables;

View File

@ -10,6 +10,7 @@
#include <DataStreams/PostgreSQLSource.h> #include <DataStreams/PostgreSQLSource.h>
#include "readInvalidateQuery.h" #include "readInvalidateQuery.h"
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#endif #endif
@ -182,22 +183,27 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const std::string & config_prefix,
Block & sample_block, Block & sample_block,
ContextPtr global_context, ContextPtr context,
const std::string & /* default_database */, const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr bool /* created_from_ddl */) -> DictionarySourcePtr
{ {
#if USE_LIBPQXX #if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql"; const auto settings_config_prefix = config_prefix + ".postgresql";
auto pool = std::make_shared<postgres::PoolWithFailover>(
config, settings_config_prefix,
global_context->getSettingsRef().postgresql_connection_pool_size,
global_context->getSettingsRef().postgresql_connection_pool_wait_timeout);
PostgreSQLDictionarySource::Configuration configuration auto configurations = tryGetConfigurationsByPriorityAsNamedCollection(config, settings_config_prefix, context);
if (configurations.empty() || configurations[0].empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Having no configuration options");
auto pool = std::make_shared<postgres::PoolWithFailover>(
configurations,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{ {
.db = config.getString(fmt::format("{}.db", settings_config_prefix), ""), .db = configurations[0][0].database,
.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), ""), .schema = configurations[0][0].schema,
.table = config.getString(fmt::format("{}.table", settings_config_prefix), ""), .table = configurations[0][0].table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""), .query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""), .where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""), .invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
@ -205,13 +211,13 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1) .update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
}; };
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, configuration, pool, sample_block); return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
#else #else
(void)dict_struct; (void)dict_struct;
(void)config; (void)config;
(void)config_prefix; (void)config_prefix;
(void)sample_block; (void)sample_block;
(void)global_context; (void)context;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support."); "Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.");
#endif #endif

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <IO/WriteBufferFromString.h>
namespace DB namespace DB
@ -16,8 +17,18 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
configuration_info << "username: " << username;
return configuration_info.str();
}
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool> std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context) tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context, bool is_database_engine)
{ {
ExternalDataSourceConfiguration configuration; ExternalDataSourceConfiguration configuration;
EngineArgs non_common_args; EngineArgs non_common_args;
@ -36,10 +47,11 @@ tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context)
configuration.password = config.getString(config_prefix + ".password", ""); configuration.password = config.getString(config_prefix + ".password", "");
configuration.database = config.getString(config_prefix + ".database", ""); configuration.database = config.getString(config_prefix + ".database", "");
configuration.table = config.getString(config_prefix + ".table", ""); configuration.table = config.getString(config_prefix + ".table", "");
configuration.schema = config.getString(config_prefix + ".schema", "");
if ((args.size() == 1) && (configuration.host.empty() || configuration.port == 0 if ((args.size() == 1) && (configuration.host.empty() || configuration.port == 0
|| configuration.username.empty() || configuration.password.empty() || configuration.username.empty() || configuration.password.empty()
|| configuration.database.empty() || configuration.table.empty())) || configuration.database.empty() || (configuration.table.empty() && !is_database_engine)))
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and no key-value arguments are added"); "Named collection of connection parameters is missing some of the parameters and no key-value arguments are added");
@ -69,6 +81,8 @@ tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context)
configuration.database = arg_value.safeGet<String>(); configuration.database = arg_value.safeGet<String>();
else if (arg_name == "table") else if (arg_name == "table")
configuration.table = arg_value.safeGet<String>(); configuration.table = arg_value.safeGet<String>();
else if (arg_name == "schema")
configuration.schema = arg_value.safeGet<String>();
else else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value)); non_common_args.emplace_back(std::make_pair(arg_name, arg_value));
} }
@ -83,4 +97,91 @@ tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context)
return std::make_tuple(configuration, non_common_args, false); return std::make_tuple(configuration, non_common_args, false);
} }
ExternalDataSourceConfiguration tryGetConfigurationAsNamedCollection(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
ExternalDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(config_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(config_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(config_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(config_prefix + ".password", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(config_prefix + ".database", ""));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(config_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(config_prefix + ".schema", ""));
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.password.empty()
|| configuration.database.empty() || configuration.table.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and dictionary parameters are added");
}
}
else
{
configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
configuration.database = dict_config.getString(dict_config_prefix + ".db", "");
configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
}
return configuration;
}
ExternalDataSourcesConfigurationByPriority tryGetConfigurationsByPriorityAsNamedCollection(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
auto common_configuration = tryGetConfigurationAsNamedCollection(dict_config, dict_config_prefix, context);
ExternalDataSourcesConfigurationByPriority configurations;
if (dict_config.has(dict_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(dict_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
ExternalDataSourceConfiguration replica_configuration(common_configuration);
String replica_name = dict_config_prefix + "." + config_key;
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and no other dictionary parameters are added");
}
configurations[priority].emplace_back(replica_configuration);
}
}
}
else
{
configurations[0].emplace_back(common_configuration);
}
return configurations;
}
} }

View File

@ -20,20 +20,28 @@ struct ExternalDataSourceConfiguration
String password; String password;
String database; String database;
String table; String table;
String schema;
ExternalDataSourceConfiguration() = default; ExternalDataSourceConfiguration() = default;
ExternalDataSourceConfiguration(const ExternalDataSourceConfiguration & configuration) = default; ExternalDataSourceConfiguration(const ExternalDataSourceConfiguration & configuration) = default;
String toString() const;
}; };
using ExternalDataSourceConfigurationPtr = std::shared_ptr<ExternalDataSourceConfiguration>;
/// Highest priority is 0, the bigger the number in map, the less the priority.
using ExternalDataSourcesConfigurationByPriority = std::map<size_t, std::vector<ExternalDataSourceConfiguration>>;
struct StoragePostgreSQLConfiguration : ExternalDataSourceConfiguration struct StoragePostgreSQLConfiguration : ExternalDataSourceConfiguration
{ {
explicit StoragePostgreSQLConfiguration( explicit StoragePostgreSQLConfiguration(
const ExternalDataSourceConfiguration & common_configuration = {}, const ExternalDataSourceConfiguration & common_configuration,
const String & schema_ = "", const String & on_conflict_ = "") const String & on_conflict_ = "")
: ExternalDataSourceConfiguration(common_configuration) : ExternalDataSourceConfiguration(common_configuration)
, schema(schema_), on_conflict(on_conflict_) {} , on_conflict(on_conflict_) {}
String schema;
String on_conflict; String on_conflict;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas. std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
}; };
@ -52,6 +60,9 @@ using EngineArgs = std::vector<std::pair<String, DB::Field>>;
* i.e. storage-specific arguments, then return them back in a set: ExternalDataSource::EngineArgs. * i.e. storage-specific arguments, then return them back in a set: ExternalDataSource::EngineArgs.
*/ */
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool> std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context); tryGetConfigurationAsNamedCollection(ASTs args, ContextPtr context, bool is_database_engine = false);
ExternalDataSourcesConfigurationByPriority
tryGetConfigurationsByPriorityAsNamedCollection(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
} }

View File

@ -89,16 +89,15 @@ StorageExternalDistributed::StorageExternalDistributed(
case ExternalStorageEngine::PostgreSQL: case ExternalStorageEngine::PostgreSQL:
{ {
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432); // StoragePostgreSQLConfiguration configuration;
// configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
auto pool = std::make_shared<postgres::PoolWithFailover>( // auto pool = std::make_shared<postgres::PoolWithFailover>(
remote_database, // configuration,
addresses, // context->getSettingsRef().postgresql_connection_pool_size,
username, password, // context->getSettingsRef().postgresql_connection_pool_wait_timeout);
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
shard = StoragePostgreSQL::create(table_id_, std::move(pool), remote_table, columns_, constraints_, String{}); // shard = StoragePostgreSQL::create(table_id_, std::move(pool), remote_table, columns_, constraints_, String{});
break; break;
} }
#endif #endif
@ -220,6 +219,7 @@ void registerStorageExternalDistributed(StorageFactory & factory)
const String & addresses_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); const String & addresses_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
StorageExternalDistributed::ExternalStorageEngine table_engine; StorageExternalDistributed::ExternalStorageEngine table_engine;
ExternalDataSourceConfigurationPtr configuration;
if (engine_name == "URL") if (engine_name == "URL")
{ {
table_engine = StorageExternalDistributed::ExternalStorageEngine::URL; table_engine = StorageExternalDistributed::ExternalStorageEngine::URL;
@ -246,7 +246,10 @@ void registerStorageExternalDistributed(StorageFactory & factory)
if (engine_name == "MySQL") if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL; table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL") else if (engine_name == "PostgreSQL")
{
configuration = std::make_shared<StoragePostgreSQLConfiguration>(StoragePostgreSQL::getConfiguration(engine_args, args.getContext()));
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL; table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
}
else else
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL", "External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",

View File

@ -393,9 +393,7 @@ StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_a
configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
for (const auto & [arg_name, arg_value] : storage_specific_args) for (const auto & [arg_name, arg_value] : storage_specific_args)
{ {
if (arg_name == "schema") if (arg_name == "on_conflict")
configuration.schema = arg_value.safeGet<String>();
else if (arg_name == "on_conflict")
configuration.on_conflict = arg_value.safeGet<String>(); configuration.on_conflict = arg_value.safeGet<String>();
else else
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -438,11 +436,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args) factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args)
{ {
auto configuration = StoragePostgreSQL::getConfiguration(args.engine_args, args.getContext()); auto configuration = StoragePostgreSQL::getConfiguration(args.engine_args, args.getContext());
auto pool = std::make_shared<postgres::PoolWithFailover>( auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
configuration.database,
configuration.addresses,
configuration.username,
configuration.password,
args.getContext()->getSettingsRef().postgresql_connection_pool_size, args.getContext()->getSettingsRef().postgresql_connection_pool_size,
args.getContext()->getSettingsRef().postgresql_connection_pool_wait_timeout); args.getContext()->getSettingsRef().postgresql_connection_pool_wait_timeout);

View File

@ -33,12 +33,12 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
auto result = std::make_shared<StoragePostgreSQL>( auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
connection_pool, connection_pool,
configuration.table, configuration->table,
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},
String{}, String{},
configuration.schema, configuration->schema,
configuration.on_conflict); configuration->on_conflict);
result->startup(); result->startup();
return result; return result;
@ -51,8 +51,8 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr c
auto connection_holder = connection_pool->get(); auto connection_holder = connection_pool->get();
auto columns = fetchPostgreSQLTableStructure( auto columns = fetchPostgreSQLTableStructure(
connection_holder->get(), connection_holder->get(),
configuration.schema.empty() ? doubleQuoteString(configuration.table) configuration->schema.empty() ? doubleQuoteString(configuration->table)
: doubleQuoteString(configuration.schema) + '.' + doubleQuoteString(configuration.table), : doubleQuoteString(configuration->schema) + '.' + doubleQuoteString(configuration->table),
use_nulls).columns; use_nulls).columns;
return ColumnsDescription{*columns}; return ColumnsDescription{*columns};
@ -65,12 +65,8 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, Contex
if (!func_args.arguments) if (!func_args.arguments)
throw Exception("Table function 'PostgreSQL' must have arguments.", ErrorCodes::BAD_ARGUMENTS); throw Exception("Table function 'PostgreSQL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
configuration = StoragePostgreSQL::getConfiguration(func_args.arguments->children, context); configuration.emplace(StoragePostgreSQL::getConfiguration(func_args.arguments->children, context));
connection_pool = std::make_shared<postgres::PoolWithFailover>( connection_pool = std::make_shared<postgres::PoolWithFailover>(*configuration,
configuration.database,
configuration.addresses,
configuration.username,
configuration.password,
context->getSettingsRef().postgresql_connection_pool_size, context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout); context->getSettingsRef().postgresql_connection_pool_wait_timeout);
} }

View File

@ -29,7 +29,7 @@ private:
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
postgres::PoolWithFailoverPtr connection_pool; postgres::PoolWithFailoverPtr connection_pool;
StoragePostgreSQLConfiguration configuration; std::optional<StoragePostgreSQLConfiguration> configuration;
}; };
} }

View File

@ -8,7 +8,7 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', node1 = cluster.add_instance('node1',
main_configs=['configs/config.xml', 'configs/dictionaries/postgres_dict.xml'], main_configs=['configs/config.xml', 'configs/dictionaries/postgres_dict.xml', 'configs/named_collections.xml'],
with_postgres=True, with_postgres_cluster=True) with_postgres=True, with_postgres_cluster=True)
postgres_dict_table_template = """ postgres_dict_table_template = """
@ -302,6 +302,72 @@ def test_postgres_schema(started_cluster):
node1.query("DROP DICTIONARY IF EXISTS postgres_dict") node1.query("DROP DICTIONARY IF EXISTS postgres_dict")
def test_predefined_connection_configuration(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS test_table')
cursor.execute('CREATE TABLE test_table (id integer, value integer)')
cursor.execute('INSERT INTO test_table SELECT i, i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 99)
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (id integer, value integer)')
cursor.execute('INSERT INTO test_schema.test_table SELECT i, 100 FROM generate_series(0, 99) as t(i)')
node1.query('''
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1 SCHEMA test_schema))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 100)
node1.query('''
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres2))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 100)
node1.query('DROP DICTIONARY postgres_dict')
node1.query('''
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres4))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query_and_get_error("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
node1.query('''
DROP DICTIONARY postgres_dict;
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(NAME postgres1 PORT 5432))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 99)
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")

View File

@ -7,7 +7,7 @@ from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True) node1 = cluster.add_instance('node1', main_configs=["configs/named_collections.xml"], with_postgres=True)
postgres_table_template = """ postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} ( CREATE TABLE IF NOT EXISTS {} (
@ -208,6 +208,36 @@ def test_postgresql_database_with_schema(started_cluster):
node1.query("DROP DATABASE test_database") node1.query("DROP DATABASE test_database")
def test_predefined_connection_configuration(started_cluster):
cursor = started_cluster.postgres_conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (a integer PRIMARY KEY, b integer)')
node1.query("DROP DATABASE IF EXISTS postgres_database")
node1.query("CREATE DATABASE postgres_database ENGINE = PostgreSQL(postgres1)")
node1.query("INSERT INTO postgres_database.test_table SELECT number, number from numbers(100)")
assert (node1.query(f"SELECT count() FROM postgres_database.test_table").rstrip() == '100')
cursor.execute('DROP SCHEMA IF EXISTS test_schema')
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (a integer)')
node1.query("DROP DATABASE IF EXISTS postgres_database")
node1.query("CREATE DATABASE postgres_database ENGINE = PostgreSQL(postgres1, schema='test_schema')")
node1.query("INSERT INTO postgres_database.test_table SELECT number from numbers(200)")
assert (node1.query(f"SELECT count() FROM postgres_database.test_table").rstrip() == '200')
node1.query("DROP DATABASE IF EXISTS postgres_database")
node1.query_and_get_error("CREATE DATABASE postgres_database ENGINE = PostgreSQL(postgres1, 'test_schema')")
node1.query_and_get_error("CREATE DATABASE postgres_database ENGINE = PostgreSQL(postgres2)")
node1.query_and_get_error("CREATE DATABASE postgres_database ENGINE = PostgreSQL(unknown_collection)")
node1.query("CREATE DATABASE postgres_database ENGINE = PostgreSQL(postgres3, port=5432)")
assert (node1.query(f"SELECT count() FROM postgres_database.test_table").rstrip() == '100')
node1.query("DROP DATABASE postgres_database")
cursor.execute(f'DROP TABLE test_table ')
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")

View File

@ -383,6 +383,10 @@ def test_predefined_connection_configuration(started_cluster):
CREATE TABLE test_table (a UInt32, b Int32) CREATE TABLE test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres2); ENGINE PostgreSQL(postgres2);
''') ''')
node1.query_and_get_error('''
CREATE TABLE test_table (a UInt32, b Int32)
ENGINE PostgreSQL(unknown_collection);
''')
node1.query(''' node1.query('''
CREATE TABLE test_table (a UInt32, b Int32) CREATE TABLE test_table (a UInt32, b Int32)
@ -393,7 +397,7 @@ def test_predefined_connection_configuration(started_cluster):
node1.query(''' node1.query('''
DROP TABLE test_table; DROP TABLE test_table;
CREATE TABLE test_table (a UInt32, b Int32) CREATE TABLE test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1, port=5432); ENGINE PostgreSQL(postgres3, port=5432);
''') ''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100') assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')