Delete old code of named collections

This commit is contained in:
kssenii 2024-08-15 16:01:13 +02:00
parent 275de04b8f
commit 7d01c31312
11 changed files with 170 additions and 445 deletions

View File

@ -23,7 +23,7 @@ namespace postgres
{
PoolWithFailover::PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
const ReplicasConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,

View File

@ -8,7 +8,6 @@
#include "ConnectionHolder.h"
#include <mutex>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StoragePostgreSQL.h>
@ -20,12 +19,12 @@ namespace postgres
class PoolWithFailover
{
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
using ReplicasConfigurationByPriority = std::map<size_t, std::vector<DB::StoragePostgreSQL::Configuration>>;
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
const ReplicasConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,

View File

@ -8,12 +8,12 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/IInputFormat.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/logger_useful.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include <Storages/NamedCollectionsHelpers.h>
#include "registerDictionaries.h"
@ -223,21 +223,23 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
String endpoint;
String format;
auto named_collection = created_from_ddl
? getURLBasedDataSourceConfiguration(config, settings_config_prefix, global_context)
: std::nullopt;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{
url = named_collection->configuration.url;
endpoint = named_collection->configuration.endpoint;
format = named_collection->configuration.format;
validateNamedCollection(
*named_collection,
/* required_keys */{},
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"url", "endpoint", "user", "credentials.user", "password", "credentials.password", "format", "compression_method", "structure", "name"});
credentials.setUsername(named_collection->configuration.user);
credentials.setPassword(named_collection->configuration.password);
url = named_collection->getOrDefault<String>("url", "");
endpoint = named_collection->getOrDefault<String>("endpoint", "");
format = named_collection->getOrDefault<String>("format", "");
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(key, value);
credentials.setUsername(named_collection->getAnyOrDefault<String>({"user", "credentials.user"}, ""));
credentials.setPassword(named_collection->getAnyOrDefault<String>({"password", "credentials.password"}, ""));
header_entries = getHeadersFromNamedCollection(*named_collection);
}
else
{

View File

@ -1,15 +1,12 @@
#include "MongoDBDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageMongoDBSocketFactory.h>
#include <Storages/NamedCollectionsHelpers.h>
namespace DB
{
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method", "options"};
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
{
auto create_mongo_db_dictionary = [](
@ -22,35 +19,53 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
bool created_from_ddl)
{
const auto config_prefix = root_config_prefix + ".mongodb";
ExternalDataSourceConfiguration configuration;
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key); };
auto named_collection = getExternalDataSourceConfiguration(config, config_prefix, context, has_config_key);
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, config_prefix, context) : nullptr;
String host, username, password, database, method, options, collection;
UInt16 port;
if (named_collection)
{
configuration = named_collection->configuration;
validateNamedCollection(
*named_collection,
/* required_keys */{"collection"},
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"host", "port", "user", "password", "db", "database", "uri", "name", "method", "options"});
host = named_collection->getOrDefault<String>("host", "");
port = static_cast<UInt16>(named_collection->getOrDefault<UInt64>("port", 0));
username = named_collection->getOrDefault<String>("user", "");
password = named_collection->getOrDefault<String>("password", "");
database = named_collection->getAnyOrDefault<String>({"db", "database"}, "");
method = named_collection->getOrDefault<String>("method", "");
collection = named_collection->getOrDefault<String>("collection", "");
options = named_collection->getOrDefault<String>("options", "");
}
else
{
configuration.host = config.getString(config_prefix + ".host", "");
configuration.port = config.getUInt(config_prefix + ".port", 0);
configuration.username = config.getString(config_prefix + ".user", "");
configuration.password = config.getString(config_prefix + ".password", "");
configuration.database = config.getString(config_prefix + ".db", "");
host = config.getString(config_prefix + ".host", "");
port = config.getUInt(config_prefix + ".port", 0);
username = config.getString(config_prefix + ".user", "");
password = config.getString(config_prefix + ".password", "");
database = config.getString(config_prefix + ".db", "");
method = config.getString(config_prefix + ".method", "");
collection = config.getString(config_prefix + ".collection");
options = config.getString(config_prefix + ".options", "");
}
if (created_from_ddl)
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
context->getRemoteHostFilter().checkHostAndPort(host, toString(port));
return std::make_unique<MongoDBDictionarySource>(dict_struct,
return std::make_unique<MongoDBDictionarySource>(
dict_struct,
config.getString(config_prefix + ".uri", ""),
configuration.host,
configuration.port,
configuration.username,
configuration.password,
config.getString(config_prefix + ".method", ""),
configuration.database,
config.getString(config_prefix + ".collection"),
config.getString(config_prefix + ".options", ""),
host,
port,
username,
password,
method,
database,
collection,
options,
sample_block);
};

View File

@ -13,7 +13,7 @@
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/logger_useful.h>
#endif
@ -30,7 +30,7 @@ namespace ErrorCodes
static const UInt64 max_block_size = 8192;
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
@ -179,6 +179,19 @@ std::string PostgreSQLDictionarySource::toString() const
#endif
static void validateConfigKeys(
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (dictionary_allowed_keys.contains(config_key) || startsWith(config_key, "replica"))
continue;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
}
}
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
@ -191,38 +204,118 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
#if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql";
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
const auto & settings = context->getSettingsRef();
std::optional<PostgreSQLDictionarySource::Configuration> dictionary_configuration;
String database, schema, table;
postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr;
if (named_collection)
{
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, dictionary_allowed_keys);
StoragePostgreSQL::Configuration common_configuration;
common_configuration.host = named_collection->getOrDefault<String>("host", "");
common_configuration.port = named_collection->getOrDefault<UInt64>("port", 0);
common_configuration.username = named_collection->getOrDefault<String>("user", "");
common_configuration.password = named_collection->getOrDefault<String>("password", "");
common_configuration.database = named_collection->getAnyOrDefault<String>({"database", "db"}, "");
common_configuration.schema = named_collection->getOrDefault<String>("schema", "");
common_configuration.table = named_collection->getOrDefault<String>("table", "");
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration{
.db = common_configuration.database,
.schema = common_configuration.schema,
.table = common_configuration.table,
.query = named_collection->getOrDefault<String>("query", ""),
.where = named_collection->getOrDefault<String>("where", ""),
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
.update_field = named_collection->getOrDefault<String>("update_field", ""),
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
});
replicas_by_priority[0].emplace_back(common_configuration);
}
else
{
validateConfigKeys(config, settings_config_prefix);
StoragePostgreSQL::Configuration common_configuration;
common_configuration.host = config.getString(settings_config_prefix + ".host", "");
common_configuration.port = config.getUInt(settings_config_prefix + ".port", 0);
common_configuration.username = config.getString(settings_config_prefix + ".user", "");
common_configuration.password = config.getString(settings_config_prefix + ".password", "");
common_configuration.database = config.getString(fmt::format("{}.database", settings_config_prefix), config.getString(fmt::format("{}.db", settings_config_prefix), ""));
common_configuration.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), "");
common_configuration.table = config.getString(fmt::format("{}.table", settings_config_prefix), "");
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration
{
.db = common_configuration.database,
.schema = common_configuration.schema,
.table = common_configuration.table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
});
if (config.has(settings_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(settings_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
String replica_name = settings_config_prefix + "." + config_key;
StoragePostgreSQL::Configuration replica_configuration{common_configuration};
size_t priority = config.getInt(replica_name + ".priority", 0);
replica_configuration.host = config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = config.getString(replica_name + ".password", common_configuration.password);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and no other dictionary parameters are added");
}
replicas_by_priority[priority].emplace_back(replica_configuration);
}
}
}
else
{
replicas_by_priority[0].emplace_back(common_configuration);
}
}
if (created_from_ddl)
{
for (const auto & replicas : configuration.replicas_configurations)
for (const auto & replica : replicas.second)
for (const auto & [_, replicas] : replicas_by_priority)
for (const auto & replica : replicas)
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
}
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
replicas_by_priority,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
settings.postgresql_connection_pool_retries,
settings.postgresql_connection_pool_auto_close_connection,
settings.postgresql_connection_attempt_timeout);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{
.db = configuration.database,
.schema = configuration.schema,
.table = configuration.table,
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
};
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration.value(), pool, sample_block);
#else
(void)dict_struct;
(void)config;

View File

@ -1,288 +0,0 @@
#include "ExternalDataSourceConfiguration.h"
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "quota_key", "db",
"database", "table", "schema", "replica",
"update_field", "update_lag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};
template<typename T>
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<T> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
SettingsChanges config_settings;
for (const auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
config_settings.emplace_back(setting_name, setting_value);
}
return config_settings;
}
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
configuration_info << "username: " << username << "\t";
if (addresses.empty())
{
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
}
else
{
for (const auto & [replica_host, replica_port] : addresses)
{
configuration_info << "host: " << replica_host << "\t";
configuration_info << "port: " << replica_port << "\t";
}
}
return configuration_info.str();
}
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
{
host = conf.host;
port = conf.port;
username = conf.username;
password = conf.password;
quota_key = conf.quota_key;
database = conf.database;
table = conf.table;
schema = conf.schema;
addresses = conf.addresses;
addresses_expr = conf.addresses_expr;
}
static void validateConfigKeys(
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix, HasConfigKeyFunc has_config_key_func)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (!has_config_key_func(config_key))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
}
}
template <typename T>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings)
{
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
ExternalDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
validateConfigKeys(dict_config, collection_prefix, has_config_key);
auto config_settings = getSettingsChangesFromConfig(settings, config, collection_prefix);
auto dict_settings = getSettingsChangesFromConfig(settings, dict_config, dict_config_prefix);
/// dictionary config settings override collection settings.
config_settings.insert(config_settings.end(), dict_settings.begin(), dict_settings.end());
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", config.getString(collection_prefix + ".quota_key", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and dictionary parameters are not added");
}
return ExternalDataSourceInfo{.configuration = configuration, .settings_changes = config_settings};
}
return std::nullopt;
}
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
{
URLBasedDataSourceConfiguration configuration;
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.url =
dict_config.getString(dict_config_prefix + ".url", config.getString(collection_prefix + ".url", ""));
configuration.endpoint =
dict_config.getString(dict_config_prefix + ".endpoint", config.getString(collection_prefix + ".endpoint", ""));
configuration.format =
dict_config.getString(dict_config_prefix + ".format", config.getString(collection_prefix + ".format", ""));
configuration.compression_method =
dict_config.getString(dict_config_prefix + ".compression", config.getString(collection_prefix + ".compression_method", ""));
configuration.structure =
dict_config.getString(dict_config_prefix + ".structure", config.getString(collection_prefix + ".structure", ""));
configuration.user =
dict_config.getString(dict_config_prefix + ".credentials.user", config.getString(collection_prefix + ".credentials.user", ""));
configuration.password =
dict_config.getString(dict_config_prefix + ".credentials.password", config.getString(collection_prefix + ".credentials.password", ""));
String headers_prefix;
const Poco::Util::AbstractConfiguration *headers_config = nullptr;
if (dict_config.has(dict_config_prefix + ".headers"))
{
headers_prefix = dict_config_prefix + ".headers";
headers_config = &dict_config;
}
else
{
headers_prefix = collection_prefix + ".headers";
headers_config = &config;
}
if (headers_config)
{
Poco::Util::AbstractConfiguration::Keys header_keys;
headers_config->keys(headers_prefix, header_keys);
headers_prefix += ".";
for (const auto & header : header_keys)
{
const auto header_prefix = headers_prefix + header;
configuration.headers.emplace_back(
headers_config->getString(header_prefix + ".name"),
headers_config->getString(header_prefix + ".value"));
}
}
return URLBasedDataSourceConfig{ .configuration = configuration };
}
return std::nullopt;
}
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key)
{
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
ExternalDataSourceConfiguration common_configuration;
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context, has_config_key);
if (named_collection)
{
common_configuration = named_collection->configuration;
}
else
{
common_configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
common_configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", "");
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
}
ExternalDataSourcesByPriority configuration
{
.database = common_configuration.database,
.table = common_configuration.table,
.schema = common_configuration.schema,
.replicas_configurations = {}
};
if (dict_config.has(dict_config_prefix + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
dict_config.keys(dict_config_prefix, config_keys);
for (const auto & config_key : config_keys)
{
if (config_key.starts_with("replica"))
{
ExternalDataSourceConfiguration replica_configuration(common_configuration);
String replica_name = dict_config_prefix + "." + config_key;
validateConfigKeys(dict_config, replica_name, has_config_key);
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
replica_configuration.quota_key = dict_config.getString(replica_name + ".quota_key", common_configuration.quota_key);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some "
"of the parameters and no other dictionary parameters are added");
}
configuration.replicas_configurations[priority].emplace_back(replica_configuration);
}
}
}
else
{
configuration.replicas_configurations[0].emplace_back(common_configuration);
}
return configuration;
}
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
{
url = conf.url;
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
http_method = conf.http_method;
headers = conf.headers;
}
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<EmptySettingsTraits> & settings);
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<EmptySettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
}

View File

@ -1,92 +0,0 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/S3Settings.h>
#include <IO/HTTPHeaderEntries.h>
namespace DB
{
#define EMPTY_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
struct ExternalDataSourceConfiguration
{
String host;
UInt16 port = 0;
String username = "default";
String password;
String quota_key;
String database;
String table;
String schema;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
String addresses_expr;
String toString() const;
void set(const ExternalDataSourceConfiguration & conf);
};
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
struct ExternalDataSourceInfo
{
ExternalDataSourceConfiguration configuration;
SettingsChanges settings_changes;
};
using HasConfigKeyFunc = std::function<bool(const String &)>;
template <typename T = EmptySettingsTraits>
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings = {});
/// Highest priority is 0, the bigger the number in map, the less the priority.
using ExternalDataSourcesConfigurationByPriority = std::map<size_t, std::vector<ExternalDataSourceConfiguration>>;
struct ExternalDataSourcesByPriority
{
String database;
String table;
String schema;
ExternalDataSourcesConfigurationByPriority replicas_configurations;
};
ExternalDataSourcesByPriority
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key);
struct URLBasedDataSourceConfiguration
{
String url;
String endpoint;
String format = "auto";
String compression_method = "auto";
String structure = "auto";
String user;
String password;
HTTPHeaderEntries headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);
};
struct URLBasedDataSourceConfig
{
URLBasedDataSourceConfiguration configuration;
};
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
}

View File

@ -133,7 +133,7 @@ void validateNamedCollection(
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected key {} in named collection. Required keys: {}, optional keys: {}",
"Unexpected key `{}` in named collection. Required keys: {}, optional keys: {}",
backQuoteIfNeed(key), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
}

View File

@ -8,8 +8,6 @@
namespace DB
{
struct ExternalDataSourceConfiguration;
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This class unites multiple storages with replicas into multiple shards with replicas.
/// A query to external database is passed to one replica on each shard, the result is united.

View File

@ -1,5 +1,4 @@
#include <Storages/StorageMongoDB.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Common/Exception.h>

View File

@ -15,7 +15,6 @@
#include <Storages/StorageRedis.h>
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB