Merge pull request #22479 from kitaisreal/clickhouse-dictionary-source-loop-fix

ClickHouseDictionarySource loop fix
This commit is contained in:
Maksim Kita 2021-04-04 13:29:17 +03:00 committed by GitHub
commit 0bfb429c42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 97 deletions

View File

@ -18,97 +18,72 @@
namespace DB
{
static const size_t MAX_CONNECTIONS = 16;
inline static UInt16 getPortFromContext(const Context & context, bool secure)
namespace ErrorCodes
{
return secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort();
extern const int BAD_ARGUMENTS;
}
static ConnectionPoolWithFailoverPtr createPool(
const std::string & host,
UInt16 port,
bool secure,
const std::string & db,
const std::string & user,
const std::string & password)
namespace
{
ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS,
host,
port,
db,
user,
password,
"", /* cluster */
"", /* cluster_secret */
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
}
constexpr size_t MAX_CONNECTIONS = 16;
inline UInt16 getPortFromContext(const Context & context, bool secure)
{
return secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort();
}
ConnectionPoolWithFailoverPtr createPool(const ClickHouseDictionarySource::Configuration & configuration)
{
if (configuration.is_local)
return nullptr;
ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS,
configuration.host,
configuration.port,
configuration.db,
configuration.user,
configuration.password,
"", /* cluster */
"", /* cluster_secret */
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
configuration.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
}
}
ClickHouseDictionarySource::ClickHouseDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & path_to_settings,
const std::string & config_prefix,
const Configuration & configuration_,
const Block & sample_block_,
const Context & context_,
const std::string & default_database)
const Context & context_)
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, secure(config.getBool(config_prefix + ".secure", false))
, host{config.getString(config_prefix + ".host", "localhost")}
, port(config.getInt(config_prefix + ".port", getPortFromContext(context_, secure)))
, user{config.getString(config_prefix + ".user", "default")}
, password{config.getString(config_prefix + ".password", "")}
, db{config.getString(config_prefix + ".db", default_database)}
, table{config.getString(config_prefix + ".table")}
, where{config.getString(config_prefix + ".where", "")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, configuration{configuration_}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(context_)
, is_local{isLocalAddress({host, port}, getPortFromContext(context_, secure))}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, context{context_}
, pool{createPool(configuration)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
if (is_local)
{
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0));
context = copyContextAndApplySettings(path_to_settings, context, config);
}
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context.makeQueryContext();
}
ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: update_time{other.update_time}
, dict_struct{other.dict_struct}
, secure{other.secure}
, host{other.host}
, port{other.port}
, user{other.user}
, password{other.password}
, db{other.db}
, table{other.table}
, where{other.where}
, update_field{other.update_field}
, invalidate_query{other.invalidate_query}
, configuration{other.configuration}
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, context{other.context}
, pool{createPool(configuration)}
, load_all_query{other.load_all_query}
{
context.makeQueryContext();
@ -121,7 +96,7 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - 1;
std::string str_time = DateLUT::instance().timeToString(hr_time);
update_time = std::chrono::system_clock::now();
return query_builder.composeUpdateQuery(update_field, str_time);
return query_builder.composeUpdateQuery(configuration.update_field, str_time);
}
else
{
@ -155,9 +130,9 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_col
bool ClickHouseDictionarySource::isModified() const
{
if (!invalidate_query.empty())
if (!configuration.invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
auto response = doInvalidateQuery(configuration.invalidate_query);
LOG_TRACE(log, "Invalidate query has returned: {}, previous value: {}", response, invalidate_query_response);
if (invalidate_query_response == response)
return false;
@ -168,21 +143,21 @@ bool ClickHouseDictionarySource::isModified() const
bool ClickHouseDictionarySource::hasUpdateField() const
{
return !update_field.empty();
return !configuration.update_field.empty();
}
std::string ClickHouseDictionarySource::toString() const
{
return "ClickHouse: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
const std::string & where = configuration.where;
return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
}
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query)
{
/// Sample block should not contain first row default values
auto empty_sample_block = sample_block.cloneEmpty();
if (is_local)
if (configuration.is_local)
{
auto stream = executeQuery(query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
@ -195,7 +170,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const Strin
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const
{
LOG_TRACE(log, "Performing invalidate query");
if (is_local)
if (configuration.is_local)
{
Context query_context = context;
auto input_block = executeQuery(request, query_context, true).getInputStream();
@ -210,7 +185,6 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
}
}
void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
@ -218,12 +192,48 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
const std::string & config_prefix,
Block & sample_block,
const Context & context,
const std::string & default_database,
const std::string & default_database [[maybe_unused]],
bool /* check_config */) -> DictionarySourcePtr
{
return std::make_unique<ClickHouseDictionarySource>(
dict_struct, config, config_prefix, config_prefix + ".clickhouse", sample_block, context, default_database);
bool secure = config.getBool(config_prefix + ".secure", false);
Context context_copy = context;
UInt16 default_port = getPortFromContext(context_copy, secure);
std::string settings_config_prefix = config_prefix + ".clickhouse";
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
ClickHouseDictionarySource::Configuration configuration {
.secure = config.getBool(settings_config_prefix + ".secure", false),
.host = host,
.port = port,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table"),
.where = config.getString(settings_config_prefix + ".where", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.is_local = isLocalAddress({host, port}, default_port)
};
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
if (configuration.is_local)
{
context_copy.setUser(configuration.user, configuration.password, Poco::Net::SocketAddress("127.0.0.1", 0));
context_copy = copyContextAndApplySettings(config_prefix, context_copy, config);
}
String dictionary_name = config.getString(".dictionary.name", "");
String dictionary_database = config.getString(".dictionary.database", "");
if (dictionary_name == configuration.table && dictionary_database == configuration.db)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouseDictionarySource table cannot be dictionary table");
return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context_copy);
};
factory.registerSource("clickhouse", create_table_source);
}

View File

@ -18,14 +18,26 @@ namespace DB
class ClickHouseDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
const bool secure;
const std::string host;
const UInt16 port;
const std::string user;
const std::string password;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
const std::string invalidate_query;
const bool is_local;
};
ClickHouseDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & path_to_settings,
const std::string & config_prefix,
const Configuration & configuration_,
const Block & sample_block_,
const Context & context,
const std::string & default_database);
const Context & context);
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other);
@ -50,7 +62,7 @@ public:
/// Used for detection whether the hashtable should be preallocated
/// (since if there is WHERE then it can filter out too much)
bool hasWhere() const { return !where.empty(); }
bool hasWhere() const { return !configuration.where.empty(); }
private:
std::string getUpdateFieldAndDate();
@ -61,21 +73,11 @@ private:
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const bool secure;
const std::string host;
const UInt16 port;
const std::string user;
const std::string password;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
std::string invalidate_query;
const Configuration configuration;
mutable std::string invalidate_query_response;
ExternalQueryBuilder query_builder;
Block sample_block;
Context context;
const bool is_local;
ConnectionPoolWithFailoverPtr pool;
const std::string load_all_query;
Poco::Logger * log = &Poco::Logger::get("ClickHouseDictionarySource");

View File

@ -0,0 +1,3 @@
1 1
2 2
3 3

View File

@ -0,0 +1,53 @@
DROP DATABASE IF EXISTS 01780_db;
CREATE DATABASE 01780_db;
DROP DICTIONARY IF EXISTS dict1;
CREATE DICTIONARY dict1
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 TABLE 'dict1'))
LAYOUT(DIRECT());
SELECT * FROM dict1; --{serverError 36}
DROP DICTIONARY dict1;
DROP DICTIONARY IF EXISTS dict2;
CREATE DICTIONARY 01780_db.dict2
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DATABASE '01780_db' TABLE 'dict2'))
LAYOUT(DIRECT());
SELECT * FROM 01780_db.dict2; --{serverError 36}
DROP DICTIONARY 01780_db.dict2;
DROP TABLE IF EXISTS 01780_db.dict3_source;
CREATE TABLE 01780_db.dict3_source
(
id UInt64,
value String
) ENGINE = TinyLog;
INSERT INTO 01780_db.dict3_source VALUES (1, '1'), (2, '2'), (3, '3');
CREATE DICTIONARY 01780_db.dict3
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 TABLE 'dict3_source' DATABASE '01780_db'))
LAYOUT(DIRECT());
SELECT * FROM 01780_db.dict3;
DROP DICTIONARY 01780_db.dict3;
DROP DATABASE 01780_db;

View File

@ -695,6 +695,7 @@
"01685_ssd_cache_dictionary_complex_key",
"01760_system_dictionaries",
"01760_polygon_dictionaries",
"01778_hierarchical_dictionaries"
"01778_hierarchical_dictionaries",
"01780_clickhouse_dictionary_source_loop"
]
}