mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge branch 'ClickHouse:master' into interval_type_conversion
This commit is contained in:
commit
7dc4ac915f
@ -114,7 +114,7 @@ private:
|
||||
{
|
||||
if (ind < first.size())
|
||||
return first[ind];
|
||||
return second[ind % first.size()];
|
||||
return second[ind - first.size()];
|
||||
}
|
||||
|
||||
size_t size() const
|
||||
|
@ -23,7 +23,7 @@ namespace postgres
|
||||
{
|
||||
|
||||
PoolWithFailover::PoolWithFailover(
|
||||
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
|
||||
const ReplicasConfigurationByPriority & configurations_by_priority,
|
||||
size_t pool_size,
|
||||
size_t pool_wait_timeout_,
|
||||
size_t max_tries_,
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include "ConnectionHolder.h"
|
||||
#include <mutex>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
#include <Storages/StoragePostgreSQL.h>
|
||||
|
||||
|
||||
@ -20,12 +19,12 @@ namespace postgres
|
||||
|
||||
class PoolWithFailover
|
||||
{
|
||||
|
||||
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
|
||||
|
||||
public:
|
||||
using ReplicasConfigurationByPriority = std::map<size_t, std::vector<DB::StoragePostgreSQL::Configuration>>;
|
||||
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
|
||||
|
||||
PoolWithFailover(
|
||||
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
|
||||
const ReplicasConfigurationByPriority & configurations_by_priority,
|
||||
size_t pool_size,
|
||||
size_t pool_wait_timeout,
|
||||
size_t max_tries_,
|
||||
|
@ -8,12 +8,12 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "DictionarySourceFactory.h"
|
||||
#include "DictionarySourceHelpers.h"
|
||||
#include "DictionaryStructure.h"
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include "registerDictionaries.h"
|
||||
|
||||
|
||||
@ -223,21 +223,23 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
|
||||
String endpoint;
|
||||
String format;
|
||||
|
||||
auto named_collection = created_from_ddl
|
||||
? getURLBasedDataSourceConfiguration(config, settings_config_prefix, global_context)
|
||||
: std::nullopt;
|
||||
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
|
||||
if (named_collection)
|
||||
{
|
||||
url = named_collection->configuration.url;
|
||||
endpoint = named_collection->configuration.endpoint;
|
||||
format = named_collection->configuration.format;
|
||||
validateNamedCollection(
|
||||
*named_collection,
|
||||
/* required_keys */{},
|
||||
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
|
||||
"url", "endpoint", "user", "credentials.user", "password", "credentials.password", "format", "compression_method", "structure", "name"});
|
||||
|
||||
credentials.setUsername(named_collection->configuration.user);
|
||||
credentials.setPassword(named_collection->configuration.password);
|
||||
url = named_collection->getOrDefault<String>("url", "");
|
||||
endpoint = named_collection->getOrDefault<String>("endpoint", "");
|
||||
format = named_collection->getOrDefault<String>("format", "");
|
||||
|
||||
header_entries.reserve(named_collection->configuration.headers.size());
|
||||
for (const auto & [key, value] : named_collection->configuration.headers)
|
||||
header_entries.emplace_back(key, value);
|
||||
credentials.setUsername(named_collection->getAnyOrDefault<String>({"user", "credentials.user"}, ""));
|
||||
credentials.setPassword(named_collection->getAnyOrDefault<String>({"password", "credentials.password"}, ""));
|
||||
|
||||
header_entries = getHeadersFromNamedCollection(*named_collection);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1,15 +1,12 @@
|
||||
#include "MongoDBDictionarySource.h"
|
||||
#include "DictionarySourceFactory.h"
|
||||
#include "DictionaryStructure.h"
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
#include <Storages/StorageMongoDBSocketFactory.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method", "options"};
|
||||
|
||||
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||
{
|
||||
auto create_mongo_db_dictionary = [](
|
||||
@ -22,35 +19,53 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||
bool created_from_ddl)
|
||||
{
|
||||
const auto config_prefix = root_config_prefix + ".mongodb";
|
||||
ExternalDataSourceConfiguration configuration;
|
||||
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key); };
|
||||
auto named_collection = getExternalDataSourceConfiguration(config, config_prefix, context, has_config_key);
|
||||
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, config_prefix, context) : nullptr;
|
||||
|
||||
String host, username, password, database, method, options, collection;
|
||||
UInt16 port;
|
||||
if (named_collection)
|
||||
{
|
||||
configuration = named_collection->configuration;
|
||||
validateNamedCollection(
|
||||
*named_collection,
|
||||
/* required_keys */{"collection"},
|
||||
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
|
||||
"host", "port", "user", "password", "db", "database", "uri", "name", "method", "options"});
|
||||
|
||||
host = named_collection->getOrDefault<String>("host", "");
|
||||
port = static_cast<UInt16>(named_collection->getOrDefault<UInt64>("port", 0));
|
||||
username = named_collection->getOrDefault<String>("user", "");
|
||||
password = named_collection->getOrDefault<String>("password", "");
|
||||
database = named_collection->getAnyOrDefault<String>({"db", "database"}, "");
|
||||
method = named_collection->getOrDefault<String>("method", "");
|
||||
collection = named_collection->getOrDefault<String>("collection", "");
|
||||
options = named_collection->getOrDefault<String>("options", "");
|
||||
}
|
||||
else
|
||||
{
|
||||
configuration.host = config.getString(config_prefix + ".host", "");
|
||||
configuration.port = config.getUInt(config_prefix + ".port", 0);
|
||||
configuration.username = config.getString(config_prefix + ".user", "");
|
||||
configuration.password = config.getString(config_prefix + ".password", "");
|
||||
configuration.database = config.getString(config_prefix + ".db", "");
|
||||
host = config.getString(config_prefix + ".host", "");
|
||||
port = config.getUInt(config_prefix + ".port", 0);
|
||||
username = config.getString(config_prefix + ".user", "");
|
||||
password = config.getString(config_prefix + ".password", "");
|
||||
database = config.getString(config_prefix + ".db", "");
|
||||
method = config.getString(config_prefix + ".method", "");
|
||||
collection = config.getString(config_prefix + ".collection");
|
||||
options = config.getString(config_prefix + ".options", "");
|
||||
}
|
||||
|
||||
if (created_from_ddl)
|
||||
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
|
||||
context->getRemoteHostFilter().checkHostAndPort(host, toString(port));
|
||||
|
||||
return std::make_unique<MongoDBDictionarySource>(dict_struct,
|
||||
return std::make_unique<MongoDBDictionarySource>(
|
||||
dict_struct,
|
||||
config.getString(config_prefix + ".uri", ""),
|
||||
configuration.host,
|
||||
configuration.port,
|
||||
configuration.username,
|
||||
configuration.password,
|
||||
config.getString(config_prefix + ".method", ""),
|
||||
configuration.database,
|
||||
config.getString(config_prefix + ".collection"),
|
||||
config.getString(config_prefix + ".options", ""),
|
||||
host,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
method,
|
||||
database,
|
||||
collection,
|
||||
options,
|
||||
sample_block);
|
||||
};
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Core/QualifiedTableName.h>
|
||||
#include <Core/Settings.h>
|
||||
#include "DictionarySourceFactory.h"
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include "registerDictionaries.h"
|
||||
|
||||
#if USE_LIBPQXX
|
||||
@ -13,7 +14,6 @@
|
||||
#include "readInvalidateQuery.h"
|
||||
#include <Interpreters/Context.h>
|
||||
#include <QueryPipeline/QueryPipeline.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#endif
|
||||
|
||||
@ -24,16 +24,17 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password", "db", "database", "table", "schema",
|
||||
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
|
||||
|
||||
#if USE_LIBPQXX
|
||||
|
||||
static const UInt64 max_block_size = 8192;
|
||||
|
||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password", "db", "database", "table", "schema",
|
||||
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
|
||||
|
||||
namespace
|
||||
{
|
||||
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
|
||||
@ -177,6 +178,19 @@ std::string PostgreSQLDictionarySource::toString() const
|
||||
return "PostgreSQL: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
|
||||
}
|
||||
|
||||
static void validateConfigKeys(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
dict_config.keys(config_prefix, config_keys);
|
||||
for (const auto & config_key : config_keys)
|
||||
{
|
||||
if (dictionary_allowed_keys.contains(config_key) || startsWith(config_key, "replica"))
|
||||
continue;
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
|
||||
@ -191,38 +205,117 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
|
||||
{
|
||||
#if USE_LIBPQXX
|
||||
const auto settings_config_prefix = config_prefix + ".postgresql";
|
||||
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
|
||||
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
std::optional<PostgreSQLDictionarySource::Configuration> dictionary_configuration;
|
||||
postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority;
|
||||
|
||||
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr;
|
||||
if (named_collection)
|
||||
{
|
||||
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, dictionary_allowed_keys);
|
||||
|
||||
StoragePostgreSQL::Configuration common_configuration;
|
||||
common_configuration.host = named_collection->getOrDefault<String>("host", "");
|
||||
common_configuration.port = named_collection->getOrDefault<UInt64>("port", 0);
|
||||
common_configuration.username = named_collection->getOrDefault<String>("user", "");
|
||||
common_configuration.password = named_collection->getOrDefault<String>("password", "");
|
||||
common_configuration.database = named_collection->getAnyOrDefault<String>({"database", "db"}, "");
|
||||
common_configuration.schema = named_collection->getOrDefault<String>("schema", "");
|
||||
common_configuration.table = named_collection->getOrDefault<String>("table", "");
|
||||
|
||||
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration{
|
||||
.db = common_configuration.database,
|
||||
.schema = common_configuration.schema,
|
||||
.table = common_configuration.table,
|
||||
.query = named_collection->getOrDefault<String>("query", ""),
|
||||
.where = named_collection->getOrDefault<String>("where", ""),
|
||||
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
|
||||
.update_field = named_collection->getOrDefault<String>("update_field", ""),
|
||||
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
|
||||
});
|
||||
|
||||
replicas_by_priority[0].emplace_back(common_configuration);
|
||||
}
|
||||
else
|
||||
{
|
||||
validateConfigKeys(config, settings_config_prefix);
|
||||
|
||||
StoragePostgreSQL::Configuration common_configuration;
|
||||
common_configuration.host = config.getString(settings_config_prefix + ".host", "");
|
||||
common_configuration.port = config.getUInt(settings_config_prefix + ".port", 0);
|
||||
common_configuration.username = config.getString(settings_config_prefix + ".user", "");
|
||||
common_configuration.password = config.getString(settings_config_prefix + ".password", "");
|
||||
common_configuration.database = config.getString(fmt::format("{}.database", settings_config_prefix), config.getString(fmt::format("{}.db", settings_config_prefix), ""));
|
||||
common_configuration.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), "");
|
||||
common_configuration.table = config.getString(fmt::format("{}.table", settings_config_prefix), "");
|
||||
|
||||
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration
|
||||
{
|
||||
.db = common_configuration.database,
|
||||
.schema = common_configuration.schema,
|
||||
.table = common_configuration.table,
|
||||
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
|
||||
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
|
||||
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
|
||||
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
|
||||
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
|
||||
});
|
||||
|
||||
|
||||
if (config.has(settings_config_prefix + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
config.keys(settings_config_prefix, config_keys);
|
||||
|
||||
for (const auto & config_key : config_keys)
|
||||
{
|
||||
if (config_key.starts_with("replica"))
|
||||
{
|
||||
String replica_name = settings_config_prefix + "." + config_key;
|
||||
StoragePostgreSQL::Configuration replica_configuration{common_configuration};
|
||||
|
||||
size_t priority = config.getInt(replica_name + ".priority", 0);
|
||||
replica_configuration.host = config.getString(replica_name + ".host", common_configuration.host);
|
||||
replica_configuration.port = config.getUInt(replica_name + ".port", common_configuration.port);
|
||||
replica_configuration.username = config.getString(replica_name + ".user", common_configuration.username);
|
||||
replica_configuration.password = config.getString(replica_name + ".password", common_configuration.password);
|
||||
|
||||
if (replica_configuration.host.empty() || replica_configuration.port == 0
|
||||
|| replica_configuration.username.empty() || replica_configuration.password.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Named collection of connection parameters is missing some "
|
||||
"of the parameters and no other dictionary parameters are added");
|
||||
}
|
||||
|
||||
replicas_by_priority[priority].emplace_back(replica_configuration);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
replicas_by_priority[0].emplace_back(common_configuration);
|
||||
}
|
||||
}
|
||||
if (created_from_ddl)
|
||||
{
|
||||
for (const auto & replicas : configuration.replicas_configurations)
|
||||
for (const auto & replica : replicas.second)
|
||||
for (const auto & [_, replicas] : replicas_by_priority)
|
||||
for (const auto & replica : replicas)
|
||||
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
|
||||
}
|
||||
|
||||
|
||||
auto pool = std::make_shared<postgres::PoolWithFailover>(
|
||||
configuration.replicas_configurations,
|
||||
replicas_by_priority,
|
||||
settings.postgresql_connection_pool_size,
|
||||
settings.postgresql_connection_pool_wait_timeout,
|
||||
settings.postgresql_connection_pool_retries,
|
||||
settings.postgresql_connection_pool_auto_close_connection,
|
||||
settings.postgresql_connection_attempt_timeout);
|
||||
|
||||
PostgreSQLDictionarySource::Configuration dictionary_configuration
|
||||
{
|
||||
.db = configuration.database,
|
||||
.schema = configuration.schema,
|
||||
.table = configuration.table,
|
||||
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
|
||||
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
|
||||
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
|
||||
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
|
||||
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
|
||||
};
|
||||
|
||||
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
|
||||
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration.value(), pool, sample_block);
|
||||
#else
|
||||
(void)dict_struct;
|
||||
(void)config;
|
||||
|
@ -1,288 +0,0 @@
|
||||
#include "ExternalDataSourceConfiguration.h"
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
|
||||
|
||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
||||
"host", "port", "user", "password", "quota_key", "db",
|
||||
"database", "table", "schema", "replica",
|
||||
"update_field", "update_lag", "invalidate_query", "query",
|
||||
"where", "name", "secure", "uri", "collection"};
|
||||
|
||||
|
||||
template<typename T>
|
||||
SettingsChanges getSettingsChangesFromConfig(
|
||||
const BaseSettings<T> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
SettingsChanges config_settings;
|
||||
for (const auto & setting : settings.all())
|
||||
{
|
||||
const auto & setting_name = setting.getName();
|
||||
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
|
||||
if (!setting_value.empty())
|
||||
config_settings.emplace_back(setting_name, setting_value);
|
||||
}
|
||||
return config_settings;
|
||||
}
|
||||
|
||||
|
||||
String ExternalDataSourceConfiguration::toString() const
|
||||
{
|
||||
WriteBufferFromOwnString configuration_info;
|
||||
configuration_info << "username: " << username << "\t";
|
||||
if (addresses.empty())
|
||||
{
|
||||
configuration_info << "host: " << host << "\t";
|
||||
configuration_info << "port: " << port << "\t";
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & [replica_host, replica_port] : addresses)
|
||||
{
|
||||
configuration_info << "host: " << replica_host << "\t";
|
||||
configuration_info << "port: " << replica_port << "\t";
|
||||
}
|
||||
}
|
||||
return configuration_info.str();
|
||||
}
|
||||
|
||||
|
||||
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
|
||||
{
|
||||
host = conf.host;
|
||||
port = conf.port;
|
||||
username = conf.username;
|
||||
password = conf.password;
|
||||
quota_key = conf.quota_key;
|
||||
database = conf.database;
|
||||
table = conf.table;
|
||||
schema = conf.schema;
|
||||
addresses = conf.addresses;
|
||||
addresses_expr = conf.addresses_expr;
|
||||
}
|
||||
|
||||
|
||||
static void validateConfigKeys(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix, HasConfigKeyFunc has_config_key_func)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
dict_config.keys(config_prefix, config_keys);
|
||||
for (const auto & config_key : config_keys)
|
||||
{
|
||||
if (!has_config_key_func(config_key))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings)
|
||||
{
|
||||
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
|
||||
ExternalDataSourceConfiguration configuration;
|
||||
|
||||
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
|
||||
if (!collection_name.empty())
|
||||
{
|
||||
const auto & config = context->getConfigRef();
|
||||
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
|
||||
validateConfigKeys(dict_config, collection_prefix, has_config_key);
|
||||
auto config_settings = getSettingsChangesFromConfig(settings, config, collection_prefix);
|
||||
auto dict_settings = getSettingsChangesFromConfig(settings, dict_config, dict_config_prefix);
|
||||
/// dictionary config settings override collection settings.
|
||||
config_settings.insert(config_settings.end(), dict_settings.begin(), dict_settings.end());
|
||||
|
||||
if (!config.has(collection_prefix))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
|
||||
|
||||
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
|
||||
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
|
||||
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
|
||||
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
|
||||
configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", config.getString(collection_prefix + ".quota_key", ""));
|
||||
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
|
||||
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
|
||||
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
|
||||
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
|
||||
|
||||
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Named collection of connection parameters is missing some "
|
||||
"of the parameters and dictionary parameters are not added");
|
||||
}
|
||||
return ExternalDataSourceInfo{.configuration = configuration, .settings_changes = config_settings};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
|
||||
{
|
||||
URLBasedDataSourceConfiguration configuration;
|
||||
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
|
||||
if (!collection_name.empty())
|
||||
{
|
||||
const auto & config = context->getConfigRef();
|
||||
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
|
||||
|
||||
if (!config.has(collection_prefix))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
|
||||
|
||||
configuration.url =
|
||||
dict_config.getString(dict_config_prefix + ".url", config.getString(collection_prefix + ".url", ""));
|
||||
configuration.endpoint =
|
||||
dict_config.getString(dict_config_prefix + ".endpoint", config.getString(collection_prefix + ".endpoint", ""));
|
||||
configuration.format =
|
||||
dict_config.getString(dict_config_prefix + ".format", config.getString(collection_prefix + ".format", ""));
|
||||
configuration.compression_method =
|
||||
dict_config.getString(dict_config_prefix + ".compression", config.getString(collection_prefix + ".compression_method", ""));
|
||||
configuration.structure =
|
||||
dict_config.getString(dict_config_prefix + ".structure", config.getString(collection_prefix + ".structure", ""));
|
||||
configuration.user =
|
||||
dict_config.getString(dict_config_prefix + ".credentials.user", config.getString(collection_prefix + ".credentials.user", ""));
|
||||
configuration.password =
|
||||
dict_config.getString(dict_config_prefix + ".credentials.password", config.getString(collection_prefix + ".credentials.password", ""));
|
||||
|
||||
String headers_prefix;
|
||||
const Poco::Util::AbstractConfiguration *headers_config = nullptr;
|
||||
if (dict_config.has(dict_config_prefix + ".headers"))
|
||||
{
|
||||
headers_prefix = dict_config_prefix + ".headers";
|
||||
headers_config = &dict_config;
|
||||
}
|
||||
else
|
||||
{
|
||||
headers_prefix = collection_prefix + ".headers";
|
||||
headers_config = &config;
|
||||
}
|
||||
|
||||
if (headers_config)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys header_keys;
|
||||
headers_config->keys(headers_prefix, header_keys);
|
||||
headers_prefix += ".";
|
||||
for (const auto & header : header_keys)
|
||||
{
|
||||
const auto header_prefix = headers_prefix + header;
|
||||
configuration.headers.emplace_back(
|
||||
headers_config->getString(header_prefix + ".name"),
|
||||
headers_config->getString(header_prefix + ".value"));
|
||||
}
|
||||
}
|
||||
|
||||
return URLBasedDataSourceConfig{ .configuration = configuration };
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key)
|
||||
{
|
||||
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
|
||||
ExternalDataSourceConfiguration common_configuration;
|
||||
|
||||
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context, has_config_key);
|
||||
if (named_collection)
|
||||
{
|
||||
common_configuration = named_collection->configuration;
|
||||
}
|
||||
else
|
||||
{
|
||||
common_configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
|
||||
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
|
||||
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
|
||||
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
|
||||
common_configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", "");
|
||||
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
|
||||
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
|
||||
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
|
||||
}
|
||||
|
||||
ExternalDataSourcesByPriority configuration
|
||||
{
|
||||
.database = common_configuration.database,
|
||||
.table = common_configuration.table,
|
||||
.schema = common_configuration.schema,
|
||||
.replicas_configurations = {}
|
||||
};
|
||||
|
||||
if (dict_config.has(dict_config_prefix + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
dict_config.keys(dict_config_prefix, config_keys);
|
||||
|
||||
for (const auto & config_key : config_keys)
|
||||
{
|
||||
if (config_key.starts_with("replica"))
|
||||
{
|
||||
ExternalDataSourceConfiguration replica_configuration(common_configuration);
|
||||
String replica_name = dict_config_prefix + "." + config_key;
|
||||
validateConfigKeys(dict_config, replica_name, has_config_key);
|
||||
|
||||
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
|
||||
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
|
||||
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
|
||||
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
|
||||
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
|
||||
replica_configuration.quota_key = dict_config.getString(replica_name + ".quota_key", common_configuration.quota_key);
|
||||
|
||||
if (replica_configuration.host.empty() || replica_configuration.port == 0
|
||||
|| replica_configuration.username.empty() || replica_configuration.password.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Named collection of connection parameters is missing some "
|
||||
"of the parameters and no other dictionary parameters are added");
|
||||
}
|
||||
|
||||
configuration.replicas_configurations[priority].emplace_back(replica_configuration);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
configuration.replicas_configurations[0].emplace_back(common_configuration);
|
||||
}
|
||||
|
||||
return configuration;
|
||||
}
|
||||
|
||||
|
||||
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
|
||||
{
|
||||
url = conf.url;
|
||||
format = conf.format;
|
||||
compression_method = conf.compression_method;
|
||||
structure = conf.structure;
|
||||
http_method = conf.http_method;
|
||||
headers = conf.headers;
|
||||
}
|
||||
|
||||
template
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<EmptySettingsTraits> & settings);
|
||||
|
||||
template
|
||||
SettingsChanges getSettingsChangesFromConfig(
|
||||
const BaseSettings<EmptySettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
||||
|
||||
}
|
@ -1,92 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <IO/S3Settings.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
#define EMPTY_SETTINGS(M, ALIAS)
|
||||
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
|
||||
|
||||
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
|
||||
|
||||
struct ExternalDataSourceConfiguration
|
||||
{
|
||||
String host;
|
||||
UInt16 port = 0;
|
||||
String username = "default";
|
||||
String password;
|
||||
String quota_key;
|
||||
String database;
|
||||
String table;
|
||||
String schema;
|
||||
|
||||
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
|
||||
String addresses_expr;
|
||||
|
||||
String toString() const;
|
||||
|
||||
void set(const ExternalDataSourceConfiguration & conf);
|
||||
};
|
||||
|
||||
|
||||
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
|
||||
|
||||
struct ExternalDataSourceInfo
|
||||
{
|
||||
ExternalDataSourceConfiguration configuration;
|
||||
SettingsChanges settings_changes;
|
||||
};
|
||||
|
||||
using HasConfigKeyFunc = std::function<bool(const String &)>;
|
||||
|
||||
template <typename T = EmptySettingsTraits>
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings = {});
|
||||
|
||||
|
||||
/// Highest priority is 0, the bigger the number in map, the less the priority.
|
||||
using ExternalDataSourcesConfigurationByPriority = std::map<size_t, std::vector<ExternalDataSourceConfiguration>>;
|
||||
|
||||
struct ExternalDataSourcesByPriority
|
||||
{
|
||||
String database;
|
||||
String table;
|
||||
String schema;
|
||||
ExternalDataSourcesConfigurationByPriority replicas_configurations;
|
||||
};
|
||||
|
||||
ExternalDataSourcesByPriority
|
||||
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key);
|
||||
|
||||
struct URLBasedDataSourceConfiguration
|
||||
{
|
||||
String url;
|
||||
String endpoint;
|
||||
String format = "auto";
|
||||
String compression_method = "auto";
|
||||
String structure = "auto";
|
||||
|
||||
String user;
|
||||
String password;
|
||||
|
||||
HTTPHeaderEntries headers;
|
||||
String http_method;
|
||||
|
||||
void set(const URLBasedDataSourceConfiguration & conf);
|
||||
};
|
||||
|
||||
struct URLBasedDataSourceConfig
|
||||
{
|
||||
URLBasedDataSourceConfiguration configuration;
|
||||
};
|
||||
|
||||
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
|
||||
|
||||
}
|
@ -133,7 +133,7 @@ void validateNamedCollection(
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Unexpected key {} in named collection. Required keys: {}, optional keys: {}",
|
||||
"Unexpected key `{}` in named collection. Required keys: {}, optional keys: {}",
|
||||
backQuoteIfNeed(key), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
|
||||
}
|
||||
}
|
||||
|
@ -8,8 +8,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct ExternalDataSourceConfiguration;
|
||||
|
||||
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
|
||||
/// This class unites multiple storages with replicas into multiple shards with replicas.
|
||||
/// A query to external database is passed to one replica on each shard, the result is united.
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Storages/StorageMongoDB.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
#include <Storages/StorageRedis.h>
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -163,6 +163,7 @@ class CI:
|
||||
tidy=True,
|
||||
comment="clang-tidy is used for static analysis",
|
||||
),
|
||||
timeout=10800,
|
||||
),
|
||||
BuildNames.BINARY_DARWIN: CommonJobConfigs.BUILD.with_properties(
|
||||
build_config=BuildConfig(
|
||||
|
@ -530,10 +530,61 @@ def test_bad_configuration(started_cluster):
|
||||
"""
|
||||
)
|
||||
|
||||
node1.query_and_get_error(
|
||||
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
|
||||
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))"
|
||||
)
|
||||
assert node1.contains_in_log("Unexpected key `dbbb`")
|
||||
|
||||
|
||||
def test_named_collection_from_ddl(started_cluster):
|
||||
cursor = started_cluster.postgres_conn.cursor()
|
||||
cursor.execute("DROP TABLE IF EXISTS test_table")
|
||||
cursor.execute("CREATE TABLE test_table (id integer, value integer)")
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
DROP NAMED COLLECTION IF EXISTS pg_conn;
|
||||
CREATE NAMED COLLECTION pg_conn
|
||||
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, database = 'postgres', table = 'test_table';
|
||||
"""
|
||||
)
|
||||
|
||||
cursor.execute(
|
||||
"INSERT INTO test_table SELECT i, i FROM generate_series(0, 99) as t(i)"
|
||||
)
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
DROP DICTIONARY IF EXISTS postgres_dict;
|
||||
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
|
||||
PRIMARY KEY id
|
||||
SOURCE(POSTGRESQL(NAME pg_conn))
|
||||
LIFETIME(MIN 1 MAX 2)
|
||||
LAYOUT(HASHED());
|
||||
"""
|
||||
)
|
||||
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
|
||||
assert int(result.strip()) == 99
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
DROP NAMED COLLECTION IF EXISTS pg_conn_2;
|
||||
CREATE NAMED COLLECTION pg_conn_2
|
||||
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, dbbb = 'postgres', table = 'test_table';
|
||||
"""
|
||||
)
|
||||
node1.query(
|
||||
"""
|
||||
DROP DICTIONARY IF EXISTS postgres_dict;
|
||||
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
|
||||
PRIMARY KEY id
|
||||
SOURCE(POSTGRESQL(NAME pg_conn_2))
|
||||
LIFETIME(MIN 1 MAX 2)
|
||||
LAYOUT(HASHED());
|
||||
"""
|
||||
)
|
||||
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
|
||||
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -19,7 +19,13 @@ def test_and_check(name, a, b, t_stat, p_value):
|
||||
)
|
||||
client.query(
|
||||
"INSERT INTO mann_whitney VALUES {};".format(
|
||||
", ".join(["({},{}), ({},{})".format(i, 0, j, 1) for i, j in zip(a, b)])
|
||||
", ".join(["({},{})".format(i, 0) for i in a])
|
||||
)
|
||||
)
|
||||
|
||||
client.query(
|
||||
"INSERT INTO mann_whitney VALUES {};".format(
|
||||
", ".join(["({},{})".format(i, 1) for i in b])
|
||||
)
|
||||
)
|
||||
|
||||
@ -59,6 +65,15 @@ def test_mann_whitney():
|
||||
test_and_check("mannWhitneyUTest('greater')", rvs1, rvs2, s, p)
|
||||
|
||||
|
||||
def test_mann_whitney_skew():
|
||||
rvs1 = [1]
|
||||
rvs2 = [0, 2, 4]
|
||||
s, p = stats.mannwhitneyu(rvs1, rvs2, alternative="two-sided")
|
||||
test_and_check("mannWhitneyUTest", rvs1, rvs2, s, p)
|
||||
test_and_check("mannWhitneyUTest('two-sided')", rvs1, rvs2, s, p)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_mann_whitney()
|
||||
test_mann_whitney_skew()
|
||||
print("Ok.")
|
||||
|
Loading…
Reference in New Issue
Block a user