Database dictionaries custom query support

This commit is contained in:
Maksim Kita 2021-07-29 23:57:06 +03:00 committed by Maksim Kita
parent 2d4030f98b
commit 30a95e77a7
12 changed files with 390 additions and 248 deletions

View File

@ -49,7 +49,7 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}
CassandraSettings::CassandraSettings(
CassandraDictionarySource::Configuration::Configuration(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: host(config.getString(config_prefix + ".host"))
@ -66,7 +66,7 @@ CassandraSettings::CassandraSettings(
setConsistency(config.getString(config_prefix + ".consistency", "One"));
}
void CassandraSettings::setConsistency(const String & config_str)
void CassandraDictionarySource::Configuration::setConsistency(const String & config_str)
{
if (config_str == "One")
consistency = CASS_CONSISTENCY_ONE;
@ -96,19 +96,19 @@ static const size_t max_block_size = 8192;
CassandraDictionarySource::CassandraDictionarySource(
const DictionaryStructure & dict_struct_,
const CassandraSettings & settings_,
const Configuration & configuration_,
const Block & sample_block_)
: log(&Poco::Logger::get("CassandraDictionarySource"))
, dict_struct(dict_struct_)
, settings(settings_)
, configuration(configuration_)
, sample_block(sample_block_)
, query_builder(dict_struct, settings.db, "", settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
, query_builder(dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::DoubleQuotes)
{
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
if (settings.port)
cassandraCheck(cass_cluster_set_port(cluster, settings.port));
cass_cluster_set_credentials(cluster, settings.user.c_str(), settings.password.c_str());
cassandraCheck(cass_cluster_set_consistency(cluster, settings.consistency));
cassandraCheck(cass_cluster_set_contact_points(cluster, configuration.host.c_str()));
if (configuration.port)
cassandraCheck(cass_cluster_set_port(cluster, configuration.port));
cass_cluster_set_credentials(cluster, configuration.user.c_str(), configuration.password.c_str());
cassandraCheck(cass_cluster_set_consistency(cluster, configuration.consistency));
}
CassandraDictionarySource::CassandraDictionarySource(
@ -118,14 +118,14 @@ CassandraDictionarySource::CassandraDictionarySource(
Block & sample_block_)
: CassandraDictionarySource(
dict_struct_,
CassandraSettings(config, config_prefix),
Configuration(config, config_prefix),
sample_block_)
{
}
void CassandraDictionarySource::maybeAllowFiltering(String & query) const
{
if (!settings.allow_filtering)
if (!configuration.allow_filtering)
return;
query.pop_back(); /// remove semicolon
query += " ALLOW FILTERING;";
@ -141,7 +141,7 @@ Pipe CassandraDictionarySource::loadAll()
std::string CassandraDictionarySource::toString() const
{
return "Cassandra: " + settings.db + '.' + settings.table;
return "Cassandra: " + configuration.db + '.' + configuration.table;
}
Pipe CassandraDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -162,7 +162,7 @@ Pipe CassandraDictionarySource::loadKeys(const Columns & key_columns, const std:
for (const auto & row : requested_rows)
{
SipHash partition_key;
for (size_t i = 0; i < settings.partition_key_prefix; ++i)
for (size_t i = 0; i < configuration.partition_key_prefix; ++i)
key_columns[i]->updateHashWithValue(row, partition_key);
partitions[partition_key.get64()].push_back(row);
}
@ -170,7 +170,7 @@ Pipe CassandraDictionarySource::loadKeys(const Columns & key_columns, const std:
Pipes pipes;
for (const auto & partition : partitions)
{
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, configuration.partition_key_prefix);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
pipes.push_back(Pipe(std::make_shared<CassandraSource>(getSession(), query, sample_block, max_block_size)));

View File

@ -14,33 +14,35 @@
namespace DB
{
struct CassandraSettings
{
String host;
UInt16 port;
String user;
String password;
String db;
String table;
CassConsistency consistency;
bool allow_filtering;
/// TODO get information about key from the driver
size_t partition_key_prefix;
size_t max_threads;
String where;
CassandraSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
void setConsistency(const String & config_str);
};
class CassandraDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
String host;
UInt16 port;
String user;
String password;
String db;
String table;
String query;
CassConsistency consistency;
bool allow_filtering;
/// TODO get information about key from the driver
size_t partition_key_prefix;
size_t max_threads;
String where;
Configuration(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
void setConsistency(const String & config_str);
};
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
const CassandraSettings & settings_,
const Configuration & configuration,
const Block & sample_block);
CassandraDictionarySource(
@ -59,7 +61,7 @@ public:
DictionarySourcePtr clone() const override
{
return std::make_unique<CassandraDictionarySource>(dict_struct, settings, sample_block);
return std::make_unique<CassandraDictionarySource>(dict_struct, configuration, sample_block);
}
Pipe loadIds(const std::vector<UInt64> & ids) override;
@ -76,7 +78,7 @@ private:
Poco::Logger * log;
const DictionaryStructure dict_struct;
const CassandraSettings settings;
const Configuration configuration;
Block sample_block;
ExternalQueryBuilder query_builder;

View File

@ -67,7 +67,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, configuration{configuration_}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(Context::createCopy(context_))
, pool{createPool(configuration)}
@ -83,7 +83,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, dict_struct{other.dict_struct}
, configuration{other.configuration}
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, context(Context::createCopy(other.context))
, pool{createPool(configuration)}
@ -241,7 +241,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table"),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),

View File

@ -25,6 +25,7 @@ public:
const std::string password;
const std::string db;
const std::string table;
const std::string query;
const std::string where;
const std::string invalidate_query;
const std::string update_field;

View File

@ -21,10 +21,23 @@ ExternalQueryBuilder::ExternalQueryBuilder(
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & query_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), schema(schema_), table(table_), where(where_), quoting_style(quoting_style_)
{}
: dict_struct(dict_struct_)
, db(db_)
, schema(schema_)
, table(table_)
, query(query_)
, where(where_)
, quoting_style(quoting_style_)
{
if (table.empty() && query.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Setting `table` or `query` must be non empty");
if (!query.empty() && !where.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Setting `where` is not supported with `query` parameter");
}
void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out) const
@ -152,74 +165,314 @@ void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_field, const std::string & time_point) const
{
WriteBufferFromOwnString out;
composeLoadAllQuery(out);
if (!where.empty())
writeString(" AND ", out);
if (query.empty())
{
composeLoadAllQuery(out);
if (!where.empty())
writeString(" AND ", out);
else
writeString(" WHERE ", out);
composeUpdateCondition(update_field, time_point, out);
writeChar(';', out);
return out.str();
}
else
writeString(" WHERE ", out);
{
writeString(query, out);
writeString(update_field, out);
writeString(" >= '", out);
writeString(time_point, out);
writeChar('\'', out);
auto condition_position = query.find("{condition}");
if (condition_position == std::string::npos)
{
writeString(" WHERE ", out);
composeUpdateCondition(update_field, time_point, out);
writeString(";", out);
writeChar(';', out);
return out.str();
return out.str();
}
WriteBufferFromOwnString condition_value_buffer;
composeUpdateCondition(update_field, time_point, condition_value_buffer);
const auto & condition_value = condition_value_buffer.str();
auto query_copy = query;
query_copy.replace(condition_position, condition_value.size(), condition_value);
return query_copy;
}
}
std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64> & ids)
std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64> & ids) const
{
if (!dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Simple key required for method");
WriteBufferFromOwnString out;
writeString("SELECT ", out);
if (!dict_struct.id->expression.empty())
if (query.empty())
{
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeString("SELECT ", out);
writeQuoted(dict_struct.id->name, out);
for (const auto & attr : dict_struct.attributes)
{
writeString(", ", out);
if (!attr.expression.empty())
if (!dict_struct.id->expression.empty())
{
writeParenthesisedString(attr.expression, out);
writeParenthesisedString(dict_struct.id->expression, out);
writeString(" AS ", out);
}
writeQuoted(attr.name, out);
}
writeQuoted(dict_struct.id->name, out);
writeString(" FROM ", out);
if (!db.empty())
for (const auto & attr : dict_struct.attributes)
{
writeString(", ", out);
if (!attr.expression.empty())
{
writeParenthesisedString(attr.expression, out);
writeString(" AS ", out);
}
writeQuoted(attr.name, out);
}
writeString(" FROM ", out);
if (!db.empty())
{
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
if (!where.empty())
{
writeString(where, out);
writeString(" AND ", out);
}
composeIdsCondition(ids, out);
writeString(";", out);
return out.str();
}
else
{
writeQuoted(db, out);
writeChar('.', out);
writeString(query, out);
auto condition_position = query.find("{condition}");
if (condition_position == std::string::npos)
{
writeString(" WHERE ", out);
composeIdsCondition(ids, out);
writeString(";", out);
return out.str();
}
WriteBufferFromOwnString condition_value_buffer;
composeIdsCondition(ids, condition_value_buffer);
const auto & condition_value = condition_value_buffer.str();
auto query_copy = query;
query_copy.replace(condition_position, condition_value.size(), condition_value);
return query_copy;
}
if (!schema.empty())
}
std::string ExternalQueryBuilder::composeLoadKeysQuery(
const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix) const
{
if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Composite key required for method");
if (key_columns.size() != dict_struct.key->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "The size of key_columns does not equal to the size of dictionary key");
WriteBufferFromOwnString out;
if (query.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
writeString("SELECT ", out);
auto first = true;
for (const auto & key_or_attribute : boost::join(*dict_struct.key, dict_struct.attributes))
{
if (!first)
writeString(", ", out);
first = false;
if (!key_or_attribute.expression.empty())
{
writeParenthesisedString(key_or_attribute.expression, out);
writeString(" AS ", out);
}
writeQuoted(key_or_attribute.name, out);
}
writeString(" FROM ", out);
if (!db.empty())
{
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
if (!where.empty())
{
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString("(", out);
writeString(where, out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString(") AND (", out);
else
writeString(" AND ", out);
}
composeKeysCondition(key_columns, requested_rows, method, partition_key_prefix, out);
writeString(";", out);
return out.str();
}
writeQuoted(table, out);
writeString(" WHERE ", out);
if (!where.empty())
else
{
writeString(where, out);
writeString(" AND ", out);
writeString(query, out);
auto condition_position = query.find("{condition}");
if (condition_position == std::string::npos)
{
writeString(" WHERE ", out);
composeKeysCondition(key_columns, requested_rows, method, partition_key_prefix, out);
writeString(";", out);
return out.str();
}
WriteBufferFromOwnString condition_value_buffer;
composeKeysCondition(key_columns, requested_rows, method, partition_key_prefix, condition_value_buffer);
const auto & condition_value = condition_value_buffer.str();
auto query_copy = query;
query_copy.replace(condition_position, condition_value.size(), condition_value);
return query_copy;
}
}
void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, size_t row, WriteBuffer & out,
size_t beg, size_t end) const
{
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(" AND ", out);
first = false;
const auto & key_description = (*dict_struct.key)[i];
/// key_i=value_i
writeQuoted(key_description.name, out);
writeString("=", out);
key_description.type_serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
}
void ExternalQueryBuilder::composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows,
WriteBuffer & out, size_t beg, size_t end) const
{
composeKeyTupleDefinition(out, beg, end);
writeString(" IN (", out);
bool first = true;
for (const auto row : requested_rows)
{
if (!first)
writeString(", ", out);
first = false;
composeKeyTuple(key_columns, row, out, beg, end);
}
writeString(")", out);
}
void ExternalQueryBuilder::composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const
{
if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Composite key required for method");
writeChar('(', out);
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(", ", out);
first = false;
writeQuoted((*dict_struct.key)[i].name, out);
}
writeChar(')', out);
}
void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, size_t row, WriteBuffer & out, size_t beg, size_t end) const
{
writeString("(", out);
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(", ", out);
first = false;
auto serialization = (*dict_struct.key)[i].type_serialization;
serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
writeString(")", out);
}
void ExternalQueryBuilder::composeUpdateCondition(const std::string & update_field, const std::string & time_point, WriteBuffer & out)
{
writeString(update_field, out);
writeString(" >= '", out);
writeString(time_point, out);
writeChar('\'', out);
}
void ExternalQueryBuilder::composeIdsCondition(const std::vector<UInt64> & ids, WriteBuffer & out) const
{
writeQuoted(dict_struct.id->name, out);
writeString(" IN (", out);
@ -233,67 +486,12 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
writeString(DB::toString(id), out);
}
writeString(");", out);
return out.str();
writeString(")", out);
}
std::string ExternalQueryBuilder::composeLoadKeysQuery(
const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix)
void ExternalQueryBuilder::composeKeysCondition(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix, WriteBuffer & out) const
{
if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Composite key required for method");
if (key_columns.size() != dict_struct.key->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "The size of key_columns does not equal to the size of dictionary key");
WriteBufferFromOwnString out;
writeString("SELECT ", out);
auto first = true;
for (const auto & key_or_attribute : boost::join(*dict_struct.key, dict_struct.attributes))
{
if (!first)
writeString(", ", out);
first = false;
if (!key_or_attribute.expression.empty())
{
writeParenthesisedString(key_or_attribute.expression, out);
writeString(" AS ", out);
}
writeQuoted(key_or_attribute.name, out);
}
writeString(" FROM ", out);
if (!db.empty())
{
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
if (!where.empty())
{
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString("(", out);
writeString(where, out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString(") AND (", out);
else
writeString(" AND ", out);
}
bool first = true;
if (method == AND_OR_CHAIN)
{
@ -334,92 +532,6 @@ std::string ExternalQueryBuilder::composeLoadKeysQuery(
{
writeString(")", out);
}
writeString(";", out);
return out.str();
}
void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out,
size_t beg, size_t end) const
{
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(" AND ", out);
first = false;
const auto & key_description = (*dict_struct.key)[i];
/// key_i=value_i
writeQuoted(key_description.name, out);
writeString("=", out);
key_description.type_serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
}
void ExternalQueryBuilder::composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows,
WriteBuffer & out, size_t beg, size_t end)
{
composeKeyTupleDefinition(out, beg, end);
writeString(" IN (", out);
bool first = true;
for (const auto row : requested_rows)
{
if (!first)
writeString(", ", out);
first = false;
composeKeyTuple(key_columns, row, out, beg, end);
}
writeString(")", out);
}
void ExternalQueryBuilder::composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const
{
if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Composite key required for method");
writeChar('(', out);
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(", ", out);
first = false;
writeQuoted((*dict_struct.key)[i].name, out);
}
writeChar(')', out);
}
void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const
{
writeString("(", out);
auto first = true;
for (size_t i = beg; i < end; ++i)
{
if (!first)
writeString(", ", out);
first = false;
auto serialization = (*dict_struct.key)[i].type_serialization;
serialization->serializeTextQuoted(*key_columns[i], row, out, format_settings);
}
writeString(")", out);
}
}

View File

@ -21,6 +21,7 @@ struct ExternalQueryBuilder
const std::string db;
const std::string schema;
const std::string table;
const std::string query;
const std::string where;
IdentifierQuotingStyle quoting_style;
@ -31,6 +32,7 @@ struct ExternalQueryBuilder
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & query_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
@ -41,7 +43,7 @@ struct ExternalQueryBuilder
std::string composeUpdateQuery(const std::string & update_field, const std::string & time_point) const;
/** Generate a query to load data by set of UInt64 keys. */
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids) const;
/** Generate a query to load data by set of composite keys.
* There are three methods of specification of composite keys in WHERE:
@ -56,7 +58,7 @@ struct ExternalQueryBuilder
CASSANDRA_SEPARATE_PARTITION_KEY,
};
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix = 0);
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix = 0) const;
private:
@ -67,16 +69,25 @@ private:
/// In the following methods `beg` and `end` specifies which columns to write in expression
/// Expression in form (x = c1 AND y = c2 ...)
void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
void composeKeyCondition(const Columns & key_columns, size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (x, y, ...) IN ((c1, c2, ...), ...)
void composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows, WriteBuffer & out, size_t beg, size_t end);
void composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows, WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (x, y, ...)
void composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (c1, c2, ...)
void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
void composeKeyTuple(const Columns & key_columns, size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Compose update condition
static void composeUpdateCondition(const std::string & update_field, const std::string & time_point, WriteBuffer & out);
/// Compose ids condition
void composeIdsCondition(const std::vector<UInt64> & ids, WriteBuffer & out) const;
/// Compose keys condition
void composeKeysCondition(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix, WriteBuffer & out) const;
/// Write string with specified quoting style.
void writeQuoted(const std::string & s, WriteBuffer & out) const;

View File

@ -22,6 +22,7 @@ static const size_t default_num_tries_on_connection_loss = 3;
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int UNSUPPORTED_METHOD;
}
void registerDictionarySourceMysql(DictionarySourceFactory & factory)
@ -41,11 +42,19 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
auto settings_config_prefix = config_prefix + ".mysql";
auto table = config.getString(settings_config_prefix + ".table", "");
auto where = config.getString(settings_config_prefix + ".where", "");
auto query = config.getString(settings_config_prefix + ".query", "");
if (query.empty() && table.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL dictionary source configuration must contain table or query field");
MySQLDictionarySource::Configuration configuration
{
.db = config.getString(settings_config_prefix + ".db", ""),
.table = config.getString(settings_config_prefix + ".table"),
.where = config.getString(settings_config_prefix + ".where", ""),
.table = table,
.query = query,
.where = where,
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
@ -94,8 +103,8 @@ MySQLDictionarySource::MySQLDictionarySource(
, configuration(configuration_)
, pool(std::move(pool_))
, sample_block(sample_block_)
, query_builder(dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks)
, load_all_query(query_builder.composeLoadAllQuery())
, query_builder(dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks)
, load_all_query(configuration.query.empty() ? query_builder.composeLoadAllQuery() : configuration.query)
, settings(settings_)
{
}
@ -108,7 +117,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, configuration(other.configuration)
, pool(other.pool)
, sample_block(other.sample_block)
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, configuration.db, "", configuration.table, configuration.query, configuration.where, IdentifierQuotingStyle::Backticks}
, load_all_query{other.load_all_query}
, last_modification{other.last_modification}
, invalidate_query_response{other.invalidate_query_response}
@ -128,7 +137,7 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
else
{
update_time = std::chrono::system_clock::now();
return query_builder.composeLoadAllQuery();
return load_all_query;
}
}

View File

@ -35,6 +35,7 @@ public:
{
const std::string db;
const std::string table;
const std::string query;
const std::string where;
const std::string invalidate_query;
const std::string update_field;

View File

@ -27,7 +27,7 @@ static const UInt64 max_block_size = 8192;
namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & where)
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
{
auto schema_value = schema;
auto table_value = table;
@ -41,7 +41,7 @@ namespace
}
}
/// Do not need db because it is already in a connection string.
return {dict_struct, "", schema_value, table_value, where, IdentifierQuotingStyle::DoubleQuotes};
return {dict_struct, "", schema_value, table_value, query, where, IdentifierQuotingStyle::DoubleQuotes};
}
}
@ -56,7 +56,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
, pool(std::move(pool_))
, sample_block(sample_block_)
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.schema, configuration.table, configuration.where))
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.schema, configuration.table, configuration.query, configuration.where))
, load_all_query(query_builder.composeLoadAllQuery())
{
}
@ -69,7 +69,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
, pool(other.pool)
, sample_block(other.sample_block)
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.schema, configuration.table, configuration.where))
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.schema, configuration.table, configuration.query, configuration.where))
, load_all_query(query_builder.composeLoadAllQuery())
, update_time(other.update_time)
, invalidate_query_response(other.invalidate_query_response)
@ -198,6 +198,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
.db = config.getString(fmt::format("{}.db", settings_config_prefix), ""),
.schema = config.getString(fmt::format("{}.schema", settings_config_prefix), ""),
.table = config.getString(fmt::format("{}.table", settings_config_prefix), ""),
.query = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.where = config.getString(fmt::format("{}.where", settings_config_prefix), ""),
.invalidate_query = config.getString(fmt::format("{}.invalidate_query", settings_config_prefix), ""),
.update_field = config.getString(fmt::format("{}.update_field", settings_config_prefix), ""),

View File

@ -26,6 +26,7 @@ public:
const String db;
const String schema;
const String table;
const String query;
const String where;
const String invalidate_query;
const String update_field;

View File

@ -34,6 +34,7 @@ namespace
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & query_,
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
@ -59,7 +60,7 @@ namespace
bridge_.getName());
}
return {dict_struct_, db_, schema, table, where_, bridge_.getIdentifierQuotingStyle()};
return {dict_struct_, db_, schema, table, query_, where_, bridge_.getIdentifierQuotingStyle()};
}
}
@ -78,7 +79,7 @@ XDBCDictionarySource::XDBCDictionarySource(
, dict_struct(dict_struct_)
, configuration(configuration_)
, sample_block(sample_block_)
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.db, configuration.schema, configuration.table, configuration.where, *bridge_))
, query_builder(makeExternalQueryBuilder(dict_struct, configuration.db, configuration.schema, configuration.table, configuration.query, configuration.where, *bridge_))
, load_all_query(query_builder.composeLoadAllQuery())
, bridge_helper(bridge_)
, bridge_url(bridge_helper->getMainURI())
@ -119,7 +120,7 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
else
{
update_time = std::chrono::system_clock::now();
return query_builder.composeLoadAllQuery();
return load_all_query;
}
}
@ -246,7 +247,8 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
{
.db = config.getString(settings_config_prefix + ".db", ""),
.schema = config.getString(settings_config_prefix + ".schema", ""),
.table = config.getString(settings_config_prefix + ".table"),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),

View File

@ -32,6 +32,7 @@ public:
const std::string db;
const std::string schema;
const std::string table;
const std::string query;
const std::string where;
const std::string invalidate_query;
const std::string update_field;