mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #68412 from ClickHouse/delete-old-named-collections-code-1
Delete old code of named collections
This commit is contained in:
commit
97cc2c365f
@ -23,7 +23,7 @@ namespace postgres
|
|||||||
{
|
{
|
||||||
|
|
||||||
PoolWithFailover::PoolWithFailover(
|
PoolWithFailover::PoolWithFailover(
|
||||||
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
|
const ReplicasConfigurationByPriority & configurations_by_priority,
|
||||||
size_t pool_size,
|
size_t pool_size,
|
||||||
size_t pool_wait_timeout_,
|
size_t pool_wait_timeout_,
|
||||||
size_t max_tries_,
|
size_t max_tries_,
|
||||||
|
@ -8,7 +8,6 @@
|
|||||||
#include "ConnectionHolder.h"
|
#include "ConnectionHolder.h"
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
#include <Storages/StoragePostgreSQL.h>
|
#include <Storages/StoragePostgreSQL.h>
|
||||||
|
|
||||||
|
|
||||||
@ -20,12 +19,12 @@ namespace postgres
|
|||||||
|
|
||||||
class PoolWithFailover
|
class PoolWithFailover
|
||||||
{
|
{
|
||||||
|
|
||||||
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
using ReplicasConfigurationByPriority = std::map<size_t, std::vector<DB::StoragePostgreSQL::Configuration>>;
|
||||||
|
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
|
||||||
|
|
||||||
PoolWithFailover(
|
PoolWithFailover(
|
||||||
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
|
const ReplicasConfigurationByPriority & configurations_by_priority,
|
||||||
size_t pool_size,
|
size_t pool_size,
|
||||||
size_t pool_wait_timeout,
|
size_t pool_wait_timeout,
|
||||||
size_t max_tries_,
|
size_t max_tries_,
|
||||||
|
@ -8,12 +8,12 @@
|
|||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Processors/Formats/IInputFormat.h>
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
#include <Poco/Net/HTTPRequest.h>
|
#include <Poco/Net/HTTPRequest.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include "DictionarySourceFactory.h"
|
#include "DictionarySourceFactory.h"
|
||||||
#include "DictionarySourceHelpers.h"
|
#include "DictionarySourceHelpers.h"
|
||||||
#include "DictionaryStructure.h"
|
#include "DictionaryStructure.h"
|
||||||
|
#include <Storages/NamedCollectionsHelpers.h>
|
||||||
#include "registerDictionaries.h"
|
#include "registerDictionaries.h"
|
||||||
|
|
||||||
|
|
||||||
@ -223,21 +223,23 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
|
|||||||
String endpoint;
|
String endpoint;
|
||||||
String format;
|
String format;
|
||||||
|
|
||||||
auto named_collection = created_from_ddl
|
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
|
||||||
? getURLBasedDataSourceConfiguration(config, settings_config_prefix, global_context)
|
|
||||||
: std::nullopt;
|
|
||||||
if (named_collection)
|
if (named_collection)
|
||||||
{
|
{
|
||||||
url = named_collection->configuration.url;
|
validateNamedCollection(
|
||||||
endpoint = named_collection->configuration.endpoint;
|
*named_collection,
|
||||||
format = named_collection->configuration.format;
|
/* required_keys */{},
|
||||||
|
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
|
||||||
|
"url", "endpoint", "user", "credentials.user", "password", "credentials.password", "format", "compression_method", "structure", "name"});
|
||||||
|
|
||||||
credentials.setUsername(named_collection->configuration.user);
|
url = named_collection->getOrDefault<String>("url", "");
|
||||||
credentials.setPassword(named_collection->configuration.password);
|
endpoint = named_collection->getOrDefault<String>("endpoint", "");
|
||||||
|
format = named_collection->getOrDefault<String>("format", "");
|
||||||
|
|
||||||
header_entries.reserve(named_collection->configuration.headers.size());
|
credentials.setUsername(named_collection->getAnyOrDefault<String>({"user", "credentials.user"}, ""));
|
||||||
for (const auto & [key, value] : named_collection->configuration.headers)
|
credentials.setPassword(named_collection->getAnyOrDefault<String>({"password", "credentials.password"}, ""));
|
||||||
header_entries.emplace_back(key, value);
|
|
||||||
|
header_entries = getHeadersFromNamedCollection(*named_collection);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1,15 +1,12 @@
|
|||||||
#include "MongoDBDictionarySource.h"
|
#include "MongoDBDictionarySource.h"
|
||||||
#include "DictionarySourceFactory.h"
|
#include "DictionarySourceFactory.h"
|
||||||
#include "DictionaryStructure.h"
|
#include "DictionaryStructure.h"
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
#include <Storages/StorageMongoDBSocketFactory.h>
|
#include <Storages/StorageMongoDBSocketFactory.h>
|
||||||
|
#include <Storages/NamedCollectionsHelpers.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
|
||||||
"host", "port", "user", "password", "db", "database", "uri", "collection", "name", "method", "options"};
|
|
||||||
|
|
||||||
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
||||||
{
|
{
|
||||||
auto create_mongo_db_dictionary = [](
|
auto create_mongo_db_dictionary = [](
|
||||||
@ -22,35 +19,53 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
|
|||||||
bool created_from_ddl)
|
bool created_from_ddl)
|
||||||
{
|
{
|
||||||
const auto config_prefix = root_config_prefix + ".mongodb";
|
const auto config_prefix = root_config_prefix + ".mongodb";
|
||||||
ExternalDataSourceConfiguration configuration;
|
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, config_prefix, context) : nullptr;
|
||||||
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key); };
|
|
||||||
auto named_collection = getExternalDataSourceConfiguration(config, config_prefix, context, has_config_key);
|
String host, username, password, database, method, options, collection;
|
||||||
|
UInt16 port;
|
||||||
if (named_collection)
|
if (named_collection)
|
||||||
{
|
{
|
||||||
configuration = named_collection->configuration;
|
validateNamedCollection(
|
||||||
|
*named_collection,
|
||||||
|
/* required_keys */{"collection"},
|
||||||
|
/* optional_keys */ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
|
||||||
|
"host", "port", "user", "password", "db", "database", "uri", "name", "method", "options"});
|
||||||
|
|
||||||
|
host = named_collection->getOrDefault<String>("host", "");
|
||||||
|
port = static_cast<UInt16>(named_collection->getOrDefault<UInt64>("port", 0));
|
||||||
|
username = named_collection->getOrDefault<String>("user", "");
|
||||||
|
password = named_collection->getOrDefault<String>("password", "");
|
||||||
|
database = named_collection->getAnyOrDefault<String>({"db", "database"}, "");
|
||||||
|
method = named_collection->getOrDefault<String>("method", "");
|
||||||
|
collection = named_collection->getOrDefault<String>("collection", "");
|
||||||
|
options = named_collection->getOrDefault<String>("options", "");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
configuration.host = config.getString(config_prefix + ".host", "");
|
host = config.getString(config_prefix + ".host", "");
|
||||||
configuration.port = config.getUInt(config_prefix + ".port", 0);
|
port = config.getUInt(config_prefix + ".port", 0);
|
||||||
configuration.username = config.getString(config_prefix + ".user", "");
|
username = config.getString(config_prefix + ".user", "");
|
||||||
configuration.password = config.getString(config_prefix + ".password", "");
|
password = config.getString(config_prefix + ".password", "");
|
||||||
configuration.database = config.getString(config_prefix + ".db", "");
|
database = config.getString(config_prefix + ".db", "");
|
||||||
|
method = config.getString(config_prefix + ".method", "");
|
||||||
|
collection = config.getString(config_prefix + ".collection");
|
||||||
|
options = config.getString(config_prefix + ".options", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (created_from_ddl)
|
if (created_from_ddl)
|
||||||
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
|
context->getRemoteHostFilter().checkHostAndPort(host, toString(port));
|
||||||
|
|
||||||
return std::make_unique<MongoDBDictionarySource>(dict_struct,
|
return std::make_unique<MongoDBDictionarySource>(
|
||||||
|
dict_struct,
|
||||||
config.getString(config_prefix + ".uri", ""),
|
config.getString(config_prefix + ".uri", ""),
|
||||||
configuration.host,
|
host,
|
||||||
configuration.port,
|
port,
|
||||||
configuration.username,
|
username,
|
||||||
configuration.password,
|
password,
|
||||||
config.getString(config_prefix + ".method", ""),
|
method,
|
||||||
configuration.database,
|
database,
|
||||||
config.getString(config_prefix + ".collection"),
|
collection,
|
||||||
config.getString(config_prefix + ".options", ""),
|
options,
|
||||||
sample_block);
|
sample_block);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <Core/QualifiedTableName.h>
|
#include <Core/QualifiedTableName.h>
|
||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
#include "DictionarySourceFactory.h"
|
#include "DictionarySourceFactory.h"
|
||||||
|
#include <Storages/NamedCollectionsHelpers.h>
|
||||||
#include "registerDictionaries.h"
|
#include "registerDictionaries.h"
|
||||||
|
|
||||||
#if USE_LIBPQXX
|
#if USE_LIBPQXX
|
||||||
@ -13,7 +14,6 @@
|
|||||||
#include "readInvalidateQuery.h"
|
#include "readInvalidateQuery.h"
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <QueryPipeline/QueryPipeline.h>
|
#include <QueryPipeline/QueryPipeline.h>
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -24,16 +24,17 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int SUPPORT_IS_DISABLED;
|
extern const int SUPPORT_IS_DISABLED;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
|
||||||
|
"host", "port", "user", "password", "db", "database", "table", "schema",
|
||||||
|
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
|
||||||
|
|
||||||
#if USE_LIBPQXX
|
#if USE_LIBPQXX
|
||||||
|
|
||||||
static const UInt64 max_block_size = 8192;
|
static const UInt64 max_block_size = 8192;
|
||||||
|
|
||||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
|
||||||
"host", "port", "user", "password", "db", "database", "table", "schema",
|
|
||||||
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
|
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
|
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
|
||||||
@ -177,6 +178,19 @@ std::string PostgreSQLDictionarySource::toString() const
|
|||||||
return "PostgreSQL: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
|
return "PostgreSQL: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void validateConfigKeys(
|
||||||
|
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix)
|
||||||
|
{
|
||||||
|
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||||
|
dict_config.keys(config_prefix, config_keys);
|
||||||
|
for (const auto & config_key : config_keys)
|
||||||
|
{
|
||||||
|
if (dictionary_allowed_keys.contains(config_key) || startsWith(config_key, "replica"))
|
||||||
|
continue;
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
|
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
|
||||||
@ -191,38 +205,117 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
|
|||||||
{
|
{
|
||||||
#if USE_LIBPQXX
|
#if USE_LIBPQXX
|
||||||
const auto settings_config_prefix = config_prefix + ".postgresql";
|
const auto settings_config_prefix = config_prefix + ".postgresql";
|
||||||
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
|
|
||||||
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
|
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
|
|
||||||
|
std::optional<PostgreSQLDictionarySource::Configuration> dictionary_configuration;
|
||||||
|
postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority;
|
||||||
|
|
||||||
|
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr;
|
||||||
|
if (named_collection)
|
||||||
|
{
|
||||||
|
validateNamedCollection<ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>>(*named_collection, {}, dictionary_allowed_keys);
|
||||||
|
|
||||||
|
StoragePostgreSQL::Configuration common_configuration;
|
||||||
|
common_configuration.host = named_collection->getOrDefault<String>("host", "");
|
||||||
|
common_configuration.port = named_collection->getOrDefault<UInt64>("port", 0);
|
||||||
|
common_configuration.username = named_collection->getOrDefault<String>("user", "");
|
||||||
|
common_configuration.password = named_collection->getOrDefault<String>("password", "");
|
||||||
|
common_configuration.database = named_collection->getAnyOrDefault<String>({"database", "db"}, "");
|
||||||
|
common_configuration.schema = named_collection->getOrDefault<String>("schema", "");
|
||||||
|
common_configuration.table = named_collection->getOrDefault<String>("table", "");
|
||||||
|
|
||||||
|
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration{
|
||||||
|
.db = common_configuration.database,
|
||||||
|
.schema = common_configuration.schema,
|
||||||
|
.table = common_configuration.table,
|
||||||
|
.query = named_collection->getOrDefault<String>("query", ""),
|
||||||
|
.where = named_collection->getOrDefault<String>("where", ""),
|
||||||
|
.invalidate_query = named_collection->getOrDefault<String>("invalidate_query", ""),
|
||||||
|
.update_field = named_collection->getOrDefault<String>("update_field", ""),
|
||||||
|
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
|
||||||
|
});
|
||||||
|
|
||||||
|
replicas_by_priority[0].emplace_back(common_configuration);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
validateConfigKeys(config, settings_config_prefix);
|
||||||
|
|
||||||
|
StoragePostgreSQL::Configuration common_configuration;
|
||||||
|
common_configuration.host = config.getString(settings_config_prefix + ".host", "");
|
||||||
|
common_configuration.port = config.getUInt(settings_config_prefix + ".port", 0);
|
||||||
|
common_configuration.username = config.getString(settings_config_prefix + ".user", "");
|
||||||
|
common_configuration.password = config.getString(settings_config_prefix + ".password", "");
|
||||||
|
common_configuration.database = config.getString(fmt::format("{}.database", settings_config_prefix), config.getString(fmt::format("{}.db", settings_config_prefix), ""));
|
||||||
|
common_configuration.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), "");
|
||||||
|
common_configuration.table = config.getString(fmt::format("{}.table", settings_config_prefix), "");
|
||||||
|
|
||||||
|
dictionary_configuration.emplace(PostgreSQLDictionarySource::Configuration
|
||||||
|
{
|
||||||
|
.db = common_configuration.database,
|
||||||
|
.schema = common_configuration.schema,
|
||||||
|
.table = common_configuration.table,
|
||||||
|
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
|
||||||
|
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
|
||||||
|
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
|
||||||
|
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
|
||||||
|
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
if (config.has(settings_config_prefix + ".replica"))
|
||||||
|
{
|
||||||
|
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||||
|
config.keys(settings_config_prefix, config_keys);
|
||||||
|
|
||||||
|
for (const auto & config_key : config_keys)
|
||||||
|
{
|
||||||
|
if (config_key.starts_with("replica"))
|
||||||
|
{
|
||||||
|
String replica_name = settings_config_prefix + "." + config_key;
|
||||||
|
StoragePostgreSQL::Configuration replica_configuration{common_configuration};
|
||||||
|
|
||||||
|
size_t priority = config.getInt(replica_name + ".priority", 0);
|
||||||
|
replica_configuration.host = config.getString(replica_name + ".host", common_configuration.host);
|
||||||
|
replica_configuration.port = config.getUInt(replica_name + ".port", common_configuration.port);
|
||||||
|
replica_configuration.username = config.getString(replica_name + ".user", common_configuration.username);
|
||||||
|
replica_configuration.password = config.getString(replica_name + ".password", common_configuration.password);
|
||||||
|
|
||||||
|
if (replica_configuration.host.empty() || replica_configuration.port == 0
|
||||||
|
|| replica_configuration.username.empty() || replica_configuration.password.empty())
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||||
|
"Named collection of connection parameters is missing some "
|
||||||
|
"of the parameters and no other dictionary parameters are added");
|
||||||
|
}
|
||||||
|
|
||||||
|
replicas_by_priority[priority].emplace_back(replica_configuration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
replicas_by_priority[0].emplace_back(common_configuration);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (created_from_ddl)
|
if (created_from_ddl)
|
||||||
{
|
{
|
||||||
for (const auto & replicas : configuration.replicas_configurations)
|
for (const auto & [_, replicas] : replicas_by_priority)
|
||||||
for (const auto & replica : replicas.second)
|
for (const auto & replica : replicas)
|
||||||
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
|
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
auto pool = std::make_shared<postgres::PoolWithFailover>(
|
auto pool = std::make_shared<postgres::PoolWithFailover>(
|
||||||
configuration.replicas_configurations,
|
replicas_by_priority,
|
||||||
settings.postgresql_connection_pool_size,
|
settings.postgresql_connection_pool_size,
|
||||||
settings.postgresql_connection_pool_wait_timeout,
|
settings.postgresql_connection_pool_wait_timeout,
|
||||||
settings.postgresql_connection_pool_retries,
|
settings.postgresql_connection_pool_retries,
|
||||||
settings.postgresql_connection_pool_auto_close_connection,
|
settings.postgresql_connection_pool_auto_close_connection,
|
||||||
settings.postgresql_connection_attempt_timeout);
|
settings.postgresql_connection_attempt_timeout);
|
||||||
|
|
||||||
PostgreSQLDictionarySource::Configuration dictionary_configuration
|
|
||||||
{
|
|
||||||
.db = configuration.database,
|
|
||||||
.schema = configuration.schema,
|
|
||||||
.table = configuration.table,
|
|
||||||
.query = config.getString(fmt::format("{}.query", settings_config_prefix), ""),
|
|
||||||
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
|
|
||||||
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
|
|
||||||
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),
|
|
||||||
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
|
|
||||||
};
|
|
||||||
|
|
||||||
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration, pool, sample_block);
|
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration.value(), pool, sample_block);
|
||||||
#else
|
#else
|
||||||
(void)dict_struct;
|
(void)dict_struct;
|
||||||
(void)config;
|
(void)config;
|
||||||
|
@ -1,288 +0,0 @@
|
|||||||
#include "ExternalDataSourceConfiguration.h"
|
|
||||||
|
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
|
||||||
#include <Parsers/ASTIdentifier.h>
|
|
||||||
#include <Parsers/ASTFunction.h>
|
|
||||||
#include <Parsers/ASTLiteral.h>
|
|
||||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
|
||||||
#include <IO/WriteBufferFromString.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int BAD_ARGUMENTS;
|
|
||||||
}
|
|
||||||
|
|
||||||
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
|
|
||||||
|
|
||||||
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
|
|
||||||
"host", "port", "user", "password", "quota_key", "db",
|
|
||||||
"database", "table", "schema", "replica",
|
|
||||||
"update_field", "update_lag", "invalidate_query", "query",
|
|
||||||
"where", "name", "secure", "uri", "collection"};
|
|
||||||
|
|
||||||
|
|
||||||
template<typename T>
|
|
||||||
SettingsChanges getSettingsChangesFromConfig(
|
|
||||||
const BaseSettings<T> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
|
||||||
{
|
|
||||||
SettingsChanges config_settings;
|
|
||||||
for (const auto & setting : settings.all())
|
|
||||||
{
|
|
||||||
const auto & setting_name = setting.getName();
|
|
||||||
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
|
|
||||||
if (!setting_value.empty())
|
|
||||||
config_settings.emplace_back(setting_name, setting_value);
|
|
||||||
}
|
|
||||||
return config_settings;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
String ExternalDataSourceConfiguration::toString() const
|
|
||||||
{
|
|
||||||
WriteBufferFromOwnString configuration_info;
|
|
||||||
configuration_info << "username: " << username << "\t";
|
|
||||||
if (addresses.empty())
|
|
||||||
{
|
|
||||||
configuration_info << "host: " << host << "\t";
|
|
||||||
configuration_info << "port: " << port << "\t";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (const auto & [replica_host, replica_port] : addresses)
|
|
||||||
{
|
|
||||||
configuration_info << "host: " << replica_host << "\t";
|
|
||||||
configuration_info << "port: " << replica_port << "\t";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return configuration_info.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
|
|
||||||
{
|
|
||||||
host = conf.host;
|
|
||||||
port = conf.port;
|
|
||||||
username = conf.username;
|
|
||||||
password = conf.password;
|
|
||||||
quota_key = conf.quota_key;
|
|
||||||
database = conf.database;
|
|
||||||
table = conf.table;
|
|
||||||
schema = conf.schema;
|
|
||||||
addresses = conf.addresses;
|
|
||||||
addresses_expr = conf.addresses_expr;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void validateConfigKeys(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & config_prefix, HasConfigKeyFunc has_config_key_func)
|
|
||||||
{
|
|
||||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
|
||||||
dict_config.keys(config_prefix, config_keys);
|
|
||||||
for (const auto & config_key : config_keys)
|
|
||||||
{
|
|
||||||
if (!has_config_key_func(config_key))
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key `{}` in dictionary source configuration", config_key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
|
||||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings)
|
|
||||||
{
|
|
||||||
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
|
|
||||||
ExternalDataSourceConfiguration configuration;
|
|
||||||
|
|
||||||
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
|
|
||||||
if (!collection_name.empty())
|
|
||||||
{
|
|
||||||
const auto & config = context->getConfigRef();
|
|
||||||
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
|
|
||||||
validateConfigKeys(dict_config, collection_prefix, has_config_key);
|
|
||||||
auto config_settings = getSettingsChangesFromConfig(settings, config, collection_prefix);
|
|
||||||
auto dict_settings = getSettingsChangesFromConfig(settings, dict_config, dict_config_prefix);
|
|
||||||
/// dictionary config settings override collection settings.
|
|
||||||
config_settings.insert(config_settings.end(), dict_settings.begin(), dict_settings.end());
|
|
||||||
|
|
||||||
if (!config.has(collection_prefix))
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
|
|
||||||
|
|
||||||
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
|
|
||||||
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
|
|
||||||
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
|
|
||||||
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
|
|
||||||
configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", config.getString(collection_prefix + ".quota_key", ""));
|
|
||||||
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
|
|
||||||
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
|
|
||||||
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
|
|
||||||
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
|
|
||||||
|
|
||||||
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
|
|
||||||
{
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
|
||||||
"Named collection of connection parameters is missing some "
|
|
||||||
"of the parameters and dictionary parameters are not added");
|
|
||||||
}
|
|
||||||
return ExternalDataSourceInfo{.configuration = configuration, .settings_changes = config_settings};
|
|
||||||
}
|
|
||||||
return std::nullopt;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context)
|
|
||||||
{
|
|
||||||
URLBasedDataSourceConfiguration configuration;
|
|
||||||
auto collection_name = dict_config.getString(dict_config_prefix + ".name", "");
|
|
||||||
if (!collection_name.empty())
|
|
||||||
{
|
|
||||||
const auto & config = context->getConfigRef();
|
|
||||||
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
|
|
||||||
|
|
||||||
if (!config.has(collection_prefix))
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
|
|
||||||
|
|
||||||
configuration.url =
|
|
||||||
dict_config.getString(dict_config_prefix + ".url", config.getString(collection_prefix + ".url", ""));
|
|
||||||
configuration.endpoint =
|
|
||||||
dict_config.getString(dict_config_prefix + ".endpoint", config.getString(collection_prefix + ".endpoint", ""));
|
|
||||||
configuration.format =
|
|
||||||
dict_config.getString(dict_config_prefix + ".format", config.getString(collection_prefix + ".format", ""));
|
|
||||||
configuration.compression_method =
|
|
||||||
dict_config.getString(dict_config_prefix + ".compression", config.getString(collection_prefix + ".compression_method", ""));
|
|
||||||
configuration.structure =
|
|
||||||
dict_config.getString(dict_config_prefix + ".structure", config.getString(collection_prefix + ".structure", ""));
|
|
||||||
configuration.user =
|
|
||||||
dict_config.getString(dict_config_prefix + ".credentials.user", config.getString(collection_prefix + ".credentials.user", ""));
|
|
||||||
configuration.password =
|
|
||||||
dict_config.getString(dict_config_prefix + ".credentials.password", config.getString(collection_prefix + ".credentials.password", ""));
|
|
||||||
|
|
||||||
String headers_prefix;
|
|
||||||
const Poco::Util::AbstractConfiguration *headers_config = nullptr;
|
|
||||||
if (dict_config.has(dict_config_prefix + ".headers"))
|
|
||||||
{
|
|
||||||
headers_prefix = dict_config_prefix + ".headers";
|
|
||||||
headers_config = &dict_config;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
headers_prefix = collection_prefix + ".headers";
|
|
||||||
headers_config = &config;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (headers_config)
|
|
||||||
{
|
|
||||||
Poco::Util::AbstractConfiguration::Keys header_keys;
|
|
||||||
headers_config->keys(headers_prefix, header_keys);
|
|
||||||
headers_prefix += ".";
|
|
||||||
for (const auto & header : header_keys)
|
|
||||||
{
|
|
||||||
const auto header_prefix = headers_prefix + header;
|
|
||||||
configuration.headers.emplace_back(
|
|
||||||
headers_config->getString(header_prefix + ".name"),
|
|
||||||
headers_config->getString(header_prefix + ".value"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return URLBasedDataSourceConfig{ .configuration = configuration };
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::nullopt;
|
|
||||||
}
|
|
||||||
|
|
||||||
ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key)
|
|
||||||
{
|
|
||||||
validateConfigKeys(dict_config, dict_config_prefix, has_config_key);
|
|
||||||
ExternalDataSourceConfiguration common_configuration;
|
|
||||||
|
|
||||||
auto named_collection = getExternalDataSourceConfiguration(dict_config, dict_config_prefix, context, has_config_key);
|
|
||||||
if (named_collection)
|
|
||||||
{
|
|
||||||
common_configuration = named_collection->configuration;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
common_configuration.host = dict_config.getString(dict_config_prefix + ".host", "");
|
|
||||||
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
|
|
||||||
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
|
|
||||||
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
|
|
||||||
common_configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", "");
|
|
||||||
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
|
|
||||||
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
|
|
||||||
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
|
|
||||||
}
|
|
||||||
|
|
||||||
ExternalDataSourcesByPriority configuration
|
|
||||||
{
|
|
||||||
.database = common_configuration.database,
|
|
||||||
.table = common_configuration.table,
|
|
||||||
.schema = common_configuration.schema,
|
|
||||||
.replicas_configurations = {}
|
|
||||||
};
|
|
||||||
|
|
||||||
if (dict_config.has(dict_config_prefix + ".replica"))
|
|
||||||
{
|
|
||||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
|
||||||
dict_config.keys(dict_config_prefix, config_keys);
|
|
||||||
|
|
||||||
for (const auto & config_key : config_keys)
|
|
||||||
{
|
|
||||||
if (config_key.starts_with("replica"))
|
|
||||||
{
|
|
||||||
ExternalDataSourceConfiguration replica_configuration(common_configuration);
|
|
||||||
String replica_name = dict_config_prefix + "." + config_key;
|
|
||||||
validateConfigKeys(dict_config, replica_name, has_config_key);
|
|
||||||
|
|
||||||
size_t priority = dict_config.getInt(replica_name + ".priority", 0);
|
|
||||||
replica_configuration.host = dict_config.getString(replica_name + ".host", common_configuration.host);
|
|
||||||
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
|
|
||||||
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
|
|
||||||
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
|
|
||||||
replica_configuration.quota_key = dict_config.getString(replica_name + ".quota_key", common_configuration.quota_key);
|
|
||||||
|
|
||||||
if (replica_configuration.host.empty() || replica_configuration.port == 0
|
|
||||||
|| replica_configuration.username.empty() || replica_configuration.password.empty())
|
|
||||||
{
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
|
||||||
"Named collection of connection parameters is missing some "
|
|
||||||
"of the parameters and no other dictionary parameters are added");
|
|
||||||
}
|
|
||||||
|
|
||||||
configuration.replicas_configurations[priority].emplace_back(replica_configuration);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
configuration.replicas_configurations[0].emplace_back(common_configuration);
|
|
||||||
}
|
|
||||||
|
|
||||||
return configuration;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
|
|
||||||
{
|
|
||||||
url = conf.url;
|
|
||||||
format = conf.format;
|
|
||||||
compression_method = conf.compression_method;
|
|
||||||
structure = conf.structure;
|
|
||||||
http_method = conf.http_method;
|
|
||||||
headers = conf.headers;
|
|
||||||
}
|
|
||||||
|
|
||||||
template
|
|
||||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
|
||||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<EmptySettingsTraits> & settings);
|
|
||||||
|
|
||||||
template
|
|
||||||
SettingsChanges getSettingsChangesFromConfig(
|
|
||||||
const BaseSettings<EmptySettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
|
||||||
|
|
||||||
}
|
|
@ -1,92 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
|
||||||
#include <IO/S3Settings.h>
|
|
||||||
#include <IO/HTTPHeaderEntries.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
#define EMPTY_SETTINGS(M, ALIAS)
|
|
||||||
DECLARE_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
|
|
||||||
|
|
||||||
struct EmptySettings : public BaseSettings<EmptySettingsTraits> {};
|
|
||||||
|
|
||||||
struct ExternalDataSourceConfiguration
|
|
||||||
{
|
|
||||||
String host;
|
|
||||||
UInt16 port = 0;
|
|
||||||
String username = "default";
|
|
||||||
String password;
|
|
||||||
String quota_key;
|
|
||||||
String database;
|
|
||||||
String table;
|
|
||||||
String schema;
|
|
||||||
|
|
||||||
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
|
|
||||||
String addresses_expr;
|
|
||||||
|
|
||||||
String toString() const;
|
|
||||||
|
|
||||||
void set(const ExternalDataSourceConfiguration & conf);
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
using StorageSpecificArgs = std::vector<std::pair<String, ASTPtr>>;
|
|
||||||
|
|
||||||
struct ExternalDataSourceInfo
|
|
||||||
{
|
|
||||||
ExternalDataSourceConfiguration configuration;
|
|
||||||
SettingsChanges settings_changes;
|
|
||||||
};
|
|
||||||
|
|
||||||
using HasConfigKeyFunc = std::function<bool(const String &)>;
|
|
||||||
|
|
||||||
template <typename T = EmptySettingsTraits>
|
|
||||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
|
||||||
ContextPtr context, HasConfigKeyFunc has_config_key, const BaseSettings<T> & settings = {});
|
|
||||||
|
|
||||||
|
|
||||||
/// Highest priority is 0, the bigger the number in map, the less the priority.
|
|
||||||
using ExternalDataSourcesConfigurationByPriority = std::map<size_t, std::vector<ExternalDataSourceConfiguration>>;
|
|
||||||
|
|
||||||
struct ExternalDataSourcesByPriority
|
|
||||||
{
|
|
||||||
String database;
|
|
||||||
String table;
|
|
||||||
String schema;
|
|
||||||
ExternalDataSourcesConfigurationByPriority replicas_configurations;
|
|
||||||
};
|
|
||||||
|
|
||||||
ExternalDataSourcesByPriority
|
|
||||||
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context, HasConfigKeyFunc has_config_key);
|
|
||||||
|
|
||||||
struct URLBasedDataSourceConfiguration
|
|
||||||
{
|
|
||||||
String url;
|
|
||||||
String endpoint;
|
|
||||||
String format = "auto";
|
|
||||||
String compression_method = "auto";
|
|
||||||
String structure = "auto";
|
|
||||||
|
|
||||||
String user;
|
|
||||||
String password;
|
|
||||||
|
|
||||||
HTTPHeaderEntries headers;
|
|
||||||
String http_method;
|
|
||||||
|
|
||||||
void set(const URLBasedDataSourceConfiguration & conf);
|
|
||||||
};
|
|
||||||
|
|
||||||
struct URLBasedDataSourceConfig
|
|
||||||
{
|
|
||||||
URLBasedDataSourceConfiguration configuration;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
|
||||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
|
|
||||||
|
|
||||||
}
|
|
@ -133,7 +133,7 @@ void validateNamedCollection(
|
|||||||
{
|
{
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::BAD_ARGUMENTS,
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
"Unexpected key {} in named collection. Required keys: {}, optional keys: {}",
|
"Unexpected key `{}` in named collection. Required keys: {}, optional keys: {}",
|
||||||
backQuoteIfNeed(key), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
|
backQuoteIfNeed(key), fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,8 +8,6 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
struct ExternalDataSourceConfiguration;
|
|
||||||
|
|
||||||
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
|
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
|
||||||
/// This class unites multiple storages with replicas into multiple shards with replicas.
|
/// This class unites multiple storages with replicas into multiple shards with replicas.
|
||||||
/// A query to external database is passed to one replica on each shard, the result is united.
|
/// A query to external database is passed to one replica on each shard, the result is united.
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
#include <Storages/StorageMongoDB.h>
|
#include <Storages/StorageMongoDB.h>
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
|
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
|
|
||||||
#include <Storages/StorageRedis.h>
|
#include <Storages/StorageRedis.h>
|
||||||
#include <TableFunctions/ITableFunction.h>
|
#include <TableFunctions/ITableFunction.h>
|
||||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
|
@ -530,10 +530,61 @@ def test_bad_configuration(started_cluster):
|
|||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
node1.query_and_get_error(
|
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
|
||||||
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))"
|
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))"
|
||||||
)
|
)
|
||||||
assert node1.contains_in_log("Unexpected key `dbbb`")
|
|
||||||
|
|
||||||
|
def test_named_collection_from_ddl(started_cluster):
|
||||||
|
cursor = started_cluster.postgres_conn.cursor()
|
||||||
|
cursor.execute("DROP TABLE IF EXISTS test_table")
|
||||||
|
cursor.execute("CREATE TABLE test_table (id integer, value integer)")
|
||||||
|
|
||||||
|
node1.query(
|
||||||
|
"""
|
||||||
|
DROP NAMED COLLECTION IF EXISTS pg_conn;
|
||||||
|
CREATE NAMED COLLECTION pg_conn
|
||||||
|
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, database = 'postgres', table = 'test_table';
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO test_table SELECT i, i FROM generate_series(0, 99) as t(i)"
|
||||||
|
)
|
||||||
|
|
||||||
|
node1.query(
|
||||||
|
"""
|
||||||
|
DROP DICTIONARY IF EXISTS postgres_dict;
|
||||||
|
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
|
||||||
|
PRIMARY KEY id
|
||||||
|
SOURCE(POSTGRESQL(NAME pg_conn))
|
||||||
|
LIFETIME(MIN 1 MAX 2)
|
||||||
|
LAYOUT(HASHED());
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
|
||||||
|
assert int(result.strip()) == 99
|
||||||
|
|
||||||
|
node1.query(
|
||||||
|
"""
|
||||||
|
DROP NAMED COLLECTION IF EXISTS pg_conn_2;
|
||||||
|
CREATE NAMED COLLECTION pg_conn_2
|
||||||
|
AS user = 'postgres', password = 'mysecretpassword', host = 'postgres1', port = 5432, dbbb = 'postgres', table = 'test_table';
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
node1.query(
|
||||||
|
"""
|
||||||
|
DROP DICTIONARY IF EXISTS postgres_dict;
|
||||||
|
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
|
||||||
|
PRIMARY KEY id
|
||||||
|
SOURCE(POSTGRESQL(NAME pg_conn_2))
|
||||||
|
LIFETIME(MIN 1 MAX 2)
|
||||||
|
LAYOUT(HASHED());
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
assert "Unexpected key `dbbb`" in node1.query_and_get_error(
|
||||||
|
"SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user