dbms: DictionarySource::loadIds pass ids by reference to const.

Allow specifying "where" for MySQL and ClickHouse dictionary sources [#METR-16432]
This commit is contained in:
Andrey Mironov 2015-05-22 16:25:45 +03:00
parent 3fc0fda58f
commit f465feeb28
4 changed files with 57 additions and 15 deletions

View File

@ -29,19 +29,21 @@ public:
password{config.getString(config_prefix + ".password", "")}, password{config.getString(config_prefix + ".password", "")},
db{config.getString(config_prefix + ".db", "")}, db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")}, table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
sample_block{sample_block}, context(context), sample_block{sample_block}, context(context),
is_local{isLocalAddress({ host, port })}, is_local{isLocalAddress({ host, port })},
pool{is_local ? nullptr : std::make_unique<ConnectionPool>( pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(), max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickHouseDictionarySource") "ClickHouseDictionarySource")
}, },
load_all_query{composeLoadAllQuery(sample_block, db, table)} load_all_query{composeLoadAllQuery()}
{} {}
/// copy-constructor is provided in order to support cloneability /// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other) ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password}, : host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.table}, db{other.db}, table{other.table},
where{other.where},
sample_block{other.sample_block}, context(other.context), sample_block{other.sample_block}, context(other.context),
is_local{other.is_local}, is_local{other.is_local},
pool{is_local ? nullptr : std::make_unique<ConnectionPool>( pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
@ -60,7 +62,7 @@ public:
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr}; return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
} }
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) override
{ {
const auto query = composeLoadIdsQuery(ids); const auto query = composeLoadIdsQuery(ids);
@ -74,10 +76,13 @@ public:
DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); } DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); }
std::string toString() const override { return "ClickHouse: " + db + '.' + table; } std::string toString() const override
{
return "ClickHouse: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
private: private:
static std::string composeLoadAllQuery(const Block & block, const std::string & db, const std::string & table) std::string composeLoadAllQuery() const
{ {
std::string query; std::string query;
@ -86,12 +91,12 @@ private:
writeString("SELECT ", out); writeString("SELECT ", out);
auto first = true; auto first = true;
for (const auto idx : ext::range(0, block.columns())) for (const auto idx : ext::range(0, sample_block.columns()))
{ {
if (!first) if (!first)
writeString(", ", out); writeString(", ", out);
writeString(block.getByPosition(idx).name, out); writeString(sample_block.getByPosition(idx).name, out);
first = false; first = false;
} }
@ -102,6 +107,13 @@ private:
writeChar('.', out); writeChar('.', out);
} }
writeProbablyBackQuotedString(table, out); writeProbablyBackQuotedString(table, out);
if (!where.empty())
{
writeString(" WHERE ", out);
writeString(where, out);
}
writeChar(';', out); writeChar(';', out);
} }
@ -134,7 +146,15 @@ private:
writeChar('.', out); writeChar('.', out);
} }
writeProbablyBackQuotedString(table, out); writeProbablyBackQuotedString(table, out);
writeString(" WHERE ", out); writeString(" WHERE ", out);
if (!where.empty())
{
writeString(where, out);
writeString(" AND ", out);
}
writeProbablyBackQuotedString(id_column_name, out); writeProbablyBackQuotedString(id_column_name, out);
writeString(" IN (", out); writeString(" IN (", out);
@ -160,6 +180,7 @@ private:
const std::string password; const std::string password;
const std::string db; const std::string db;
const std::string table; const std::string table;
const std::string where;
Block sample_block; Block sample_block;
Context & context; Context & context;
const bool is_local; const bool is_local;

View File

@ -40,7 +40,7 @@ public:
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)}; return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
} }
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) override
{ {
throw Exception{ throw Exception{
"Method unsupported", "Method unsupported",

View File

@ -25,7 +25,7 @@ public:
virtual bool supportsSelectiveLoad() const = 0; virtual bool supportsSelectiveLoad() const = 0;
/// returns an input stream with the data for a collection of identifiers /// returns an input stream with the data for a collection of identifiers
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0; virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) = 0;
/// indicates whether the source has been modified since last load* operation /// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0; virtual bool isModified() const = 0;

View File

@ -20,9 +20,10 @@ public:
Block & sample_block) Block & sample_block)
: db{config.getString(config_prefix + ".db", "")}, : db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")}, table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
sample_block{sample_block}, sample_block{sample_block},
pool{config, config_prefix}, pool{config, config_prefix},
load_all_query{composeLoadAllQuery(sample_block, db, table)}, load_all_query{composeLoadAllQuery()},
last_modification{getLastModification()} last_modification{getLastModification()}
{} {}
@ -30,6 +31,7 @@ public:
MySQLDictionarySource(const MySQLDictionarySource & other) MySQLDictionarySource(const MySQLDictionarySource & other)
: db{other.db}, : db{other.db},
table{other.table}, table{other.table},
where{other.where},
sample_block{other.sample_block}, sample_block{other.sample_block},
pool{other.pool}, pool{other.pool},
load_all_query{other.load_all_query}, last_modification{other.last_modification} load_all_query{other.load_all_query}, last_modification{other.last_modification}
@ -41,7 +43,7 @@ public:
return new MySQLBlockInputStream{pool.Get(), load_all_query, sample_block, max_block_size}; return new MySQLBlockInputStream{pool.Get(), load_all_query, sample_block, max_block_size};
} }
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> & ids) override
{ {
last_modification = getLastModification(); last_modification = getLastModification();
const auto query = composeLoadIdsQuery(ids); const auto query = composeLoadIdsQuery(ids);
@ -54,7 +56,10 @@ public:
DictionarySourcePtr clone() const override { return std::make_unique<MySQLDictionarySource>(*this); } DictionarySourcePtr clone() const override { return std::make_unique<MySQLDictionarySource>(*this); }
std::string toString() const override { return "MySQL: " + db + '.' + table; } std::string toString() const override
{
return "MySQL: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
private: private:
mysqlxx::DateTime getLastModification() const mysqlxx::DateTime getLastModification() const
@ -88,7 +93,7 @@ private:
return update_time; return update_time;
} }
static std::string composeLoadAllQuery(const Block & block, const std::string & db, const std::string & table) std::string composeLoadAllQuery() const
{ {
std::string query; std::string query;
@ -97,12 +102,12 @@ private:
writeString("SELECT ", out); writeString("SELECT ", out);
auto first = true; auto first = true;
for (const auto idx : ext::range(0, block.columns())) for (const auto idx : ext::range(0, sample_block.columns()))
{ {
if (!first) if (!first)
writeString(", ", out); writeString(", ", out);
writeString(block.getByPosition(idx).name, out); writeString(sample_block.getByPosition(idx).name, out);
first = false; first = false;
} }
@ -113,13 +118,20 @@ private:
writeChar('.', out); writeChar('.', out);
} }
writeProbablyBackQuotedString(table, out); writeProbablyBackQuotedString(table, out);
if (!where.empty())
{
writeString(" WHERE ", out);
writeString(where, out);
}
writeChar(';', out); writeChar(';', out);
} }
return query; return query;
} }
std::string composeLoadIdsQuery(const std::vector<std::uint64_t> ids) std::string composeLoadIdsQuery(const std::vector<std::uint64_t> & ids)
{ {
std::string query; std::string query;
@ -145,7 +157,15 @@ private:
writeChar('.', out); writeChar('.', out);
} }
writeProbablyBackQuotedString(table, out); writeProbablyBackQuotedString(table, out);
writeString(" WHERE ", out); writeString(" WHERE ", out);
if (!where.empty())
{
writeString(where, out);
writeString(" AND ", out);
}
writeProbablyBackQuotedString(id_column_name, out); writeProbablyBackQuotedString(id_column_name, out);
writeString(" IN (", out); writeString(" IN (", out);
@ -167,6 +187,7 @@ private:
const std::string db; const std::string db;
const std::string table; const std::string table;
const std::string where;
Block sample_block; Block sample_block;
mutable mysqlxx::PoolWithFailover pool; mutable mysqlxx::PoolWithFailover pool;
const std::string load_all_query; const std::string load_all_query;