ClickHouse dictionary support named collections

This commit is contained in:
kssenii 2021-11-25 01:45:23 +03:00
parent ac66433650
commit 20c59b66f9
5 changed files with 101 additions and 43 deletions

View File

@ -7,6 +7,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>
@ -221,39 +222,67 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
Block & sample_block,
ContextPtr global_context,
const std::string & default_database [[maybe_unused]],
bool /* created_from_ddl */) -> DictionarySourcePtr
bool created_from_ddl) -> DictionarySourcePtr
{
bool secure = config.getBool(config_prefix + ".secure", false);
UInt16 default_port = getPortFromContext(global_context, secure);
std::string settings_config_prefix = config_prefix + ".clickhouse";
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
ClickHouseDictionarySource::Configuration configuration
std::unique_ptr<ClickHouseDictionarySource::Configuration> configuration;
auto named_collection = created_from_ddl ?
getExternalDataSourceConfiguration(config, settings_config_prefix, global_context) : std::nullopt;
if (named_collection)
{
.host = host,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.port = port,
.is_local = isLocalAddress({host, port}, default_port),
.secure = config.getBool(settings_config_prefix + ".secure", false)
};
std::string host = named_collection->host;
UInt16 port = named_collection->port;
configuration = std::make_unique<ClickHouseDictionarySource::Configuration>(
ClickHouseDictionarySource::Configuration{
.host = host,
.user = named_collection->username,
.password = named_collection->password,
.db = named_collection->database,
.table = named_collection->table,
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.port = port,
.is_local = isLocalAddress({host, port}, default_port),
.secure = config.getBool(settings_config_prefix + ".secure", false)
});
}
else
{
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
configuration = std::make_unique<ClickHouseDictionarySource::Configuration>(
ClickHouseDictionarySource::Configuration{
.host = host,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.port = port,
.is_local = isLocalAddress({host, port}, default_port),
.secure = config.getBool(settings_config_prefix + ".secure", false)
});
}
ContextMutablePtr context;
if (configuration.is_local)
if (configuration->is_local)
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
Session session(global_context, ClientInfo::Interface::LOCAL);
session.authenticate(configuration.user, configuration.password, {});
session.authenticate(configuration->user, configuration->password, {});
context = session.makeQueryContext();
}
else
@ -265,10 +294,10 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
String dictionary_name = config.getString(".dictionary.name", "");
String dictionary_database = config.getString(".dictionary.database", "");
if (dictionary_name == configuration.table && dictionary_database == configuration.db)
if (dictionary_name == configuration->table && dictionary_database == configuration->db)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouseDictionarySource table cannot be dictionary table");
return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context);
return std::make_unique<ClickHouseDictionarySource>(dict_struct, *configuration, sample_block, context);
};
factory.registerSource("clickhouse", create_table_source);

View File

@ -62,9 +62,9 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
const auto & config_prefix = fmt::format("named_collections.{}", collection->name());
const auto & collection_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
if (!config.has(collection_prefix))
{
/// For table function remote we do not throw on no collection, because then we consider first arg
/// as cluster definition from config.
@ -74,14 +74,14 @@ std::optional<ExternalDataSourceConfig> getExternalDataSourceConfiguration(const
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
}
configuration.host = config.getString(config_prefix + ".host", "");
configuration.port = config.getInt(config_prefix + ".port", 0);
configuration.username = config.getString(config_prefix + ".user", "");
configuration.password = config.getString(config_prefix + ".password", "");
configuration.database = config.getString(config_prefix + ".database", "");
configuration.table = config.getString(config_prefix + ".table", "");
configuration.schema = config.getString(config_prefix + ".schema", "");
configuration.addresses_expr = config.getString(config_prefix + ".addresses_expr", "");
configuration.host = config.getString(collection_prefix + ".host", "");
configuration.port = config.getInt(collection_prefix + ".port", 0);
configuration.username = config.getString(collection_prefix + ".user", "");
configuration.password = config.getString(collection_prefix + ".password", "");
configuration.database = config.getString(collection_prefix + ".database", "");
configuration.table = config.getString(collection_prefix + ".table", "");
configuration.schema = config.getString(collection_prefix + ".schema", "");
configuration.addresses_expr = config.getString(collection_prefix + ".addresses_expr", "");
if (!configuration.addresses_expr.empty() && !configuration.host.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have `addresses_expr` and `host`, `port` in configuration at the same time");
@ -161,21 +161,20 @@ std::optional<ExternalDataSourceConfiguration> getExternalDataSourceConfiguratio
if (!collection_name.empty())
{
const auto & config = context->getConfigRef();
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);
const auto & collection_prefix = fmt::format("named_collections.{}", collection_name);
if (!config.has(config_prefix))
if (!config.has(collection_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection_name);
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(config_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(config_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(config_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(config_prefix + ".password", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(config_prefix + ".database", ""));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(config_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(config_prefix + ".schema", ""));
configuration.host = dict_config.getString(dict_config_prefix + ".host", config.getString(collection_prefix + ".host", ""));
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(collection_prefix + ".database", ""));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
configuration.schema = dict_config.getString(dict_config_prefix + ".schema", config.getString(collection_prefix + ".schema", ""));
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.password.empty()
|| configuration.database.empty() || configuration.table.empty())
if (configuration.host.empty() || configuration.port == 0 || configuration.username.empty() || configuration.table.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Named collection of connection parameters is missing some of the parameters and dictionary parameters are added");

View File

@ -9,5 +9,10 @@
<host>127.0.0.{1,2}</host>
<table>remote_test</table>
</remote2>
<clickhouse_dictionary>
<host>localhost</host>
<user>default</user>
<table>s</table>
</clickhouse_dictionary>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,24 @@
-- Tags: no-parallel
DROP DICTIONARY IF EXISTS dict;
DROP TABLE IF EXISTS s;
CREATE TABLE s
(
id UInt64,
value String
)
ENGINE = Memory;
INSERT INTO s VALUES(1, 'OK');
CREATE DICTIONARY dict
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(NAME clickhouse_dictionary PORT tcpPort()))
LIFETIME(MIN 1 MAX 1000)
LAYOUT(CACHE(SIZE_IN_CELLS 10));
SELECT dictGet('dict', 'value', toUInt64(1));