CLICKHOUSE-3476

This commit is contained in:
VadimPE 2018-09-11 12:29:02 +03:00
parent eb5f571935
commit bb7a353e9b
2 changed files with 34 additions and 1 deletions

View File

@ -2,7 +2,10 @@
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/writeParenthesisedString.h>
#include <Client/ConnectionPool.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Dictionaries/readInvalidateQuery.h>
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
#include <memory>
@ -50,6 +53,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field", "")},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks},
sample_block{sample_block}, context(context),
is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))},
@ -67,6 +71,8 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
db{other.db}, table{other.table},
where{other.where},
update_field{other.update_field},
invalidate_query{other.invalidate_query},
invalidate_query_response{other.invalidate_query_response},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
@ -125,6 +131,18 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(
key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES));
}
bool ClickHouseDictionarySource::isModified() const
{
if (!invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
if (invalidate_query_response == response)
return false;
invalidate_query_response = response;
}
return true;
}
bool ClickHouseDictionarySource::hasUpdateField() const
{
return !update_field.empty();
@ -143,4 +161,15 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
}
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
auto invalidate_stream = RemoteBlockInputStream(pool, request, invalidate_sample_block, context);
return readInvalidateQuery(invalidate_stream);
}
}

View File

@ -35,7 +35,7 @@ public:
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override { return true; }
bool isModified() const override;
bool supportsSelectiveLoad() const override { return true; }
bool hasUpdateField() const override;
@ -49,6 +49,8 @@ private:
BlockInputStreamPtr createStreamForSelectiveLoad(const std::string & query);
std::string doInvalidateQuery(const std::string & request) const;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string host;
@ -60,6 +62,8 @@ private:
const std::string table;
const std::string where;
const std::string update_field;
std::string invalidate_query;
mutable std::string invalidate_query_response;
ExternalQueryBuilder query_builder;
Block sample_block;
Context & context;