Merge pull request #1771 from joomag/dictionary-update

Add external dictionary update feature
This commit is contained in:
alexey-milovidov 2018-03-07 21:48:42 +03:00 committed by GitHub
commit bd455283cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 637 additions and 87 deletions

View File

@ -37,7 +37,8 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block, Context & context)
: dict_struct{dict_struct_},
: update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
host{config.getString(config_prefix + ".host")},
port(config.getInt(config_prefix + ".port")),
user{config.getString(config_prefix + ".user", "")},
@ -45,6 +46,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field")},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
sample_block{sample_block}, context(context),
is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))},
@ -54,10 +56,12 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: dict_struct{other.dict_struct},
: update_time{other.update_time},
dict_struct{other.dict_struct},
host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.table},
where{other.where},
update_field{other.update_field},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
@ -65,6 +69,23 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
load_all_query{other.load_all_query}
{}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
std::string str_time = std::to_string(LocalDateTime(hr_time));
return query_builder.composeUpdateQuery(update_field, str_time);
}
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
}
}
BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
{
@ -76,6 +97,13 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
{
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
return executeQuery(load_update_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, context);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
@ -92,6 +120,10 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(
key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES));
}
bool ClickHouseDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
std::string ClickHouseDictionarySource::toString() const
{

View File

@ -28,6 +28,8 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
@ -36,13 +38,18 @@ public:
bool isModified() const override { return true; }
bool supportsSelectiveLoad() const override { return true; }
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); }
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
BlockInputStreamPtr createStreamForSelectiveLoad(const std::string & query);
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string host;
const UInt16 port;
@ -51,6 +58,7 @@ private:
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
ExternalQueryBuilder query_builder;
Block sample_block;
Context & context;

View File

@ -17,9 +17,9 @@ namespace ErrorCodes
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty)
const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty)
require_nonempty(require_nonempty), saved_block{std::move(saved_block)}
{
createAttributes();
@ -38,7 +38,7 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
}
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other)
: ComplexKeyHashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: ComplexKeyHashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -227,63 +227,149 @@ void ComplexKeyHashedDictionary::createAttributes()
}
}
void ComplexKeyHashedDictionary::loadData()
void ComplexKeyHashedDictionary::blockToAttributes(const Block & block)
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
/// created upfront to avoid excess allocations
const auto keys_size = dict_struct.key->size();
StringRefs keys(keys_size);
const auto attributes_size = attributes.size();
const auto rows = block.rows();
element_count += rows;
const auto key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size),
[&](const size_t attribute_idx) {
return block.safeGetByPosition(attribute_idx).column;
});
const auto attribute_column_ptrs = ext::map<Columns>(ext::range(0, attributes_size),
[&](const size_t attribute_idx) {
return block.safeGetByPosition(
keys_size + attribute_idx).column;
});
for (const auto row_idx : ext::range(0, rows)) {
/// calculate key once per row
const auto key = placeKeysInPool(row_idx, key_column_ptrs, keys, keys_pool);
auto should_rollback = false;
for (const auto attribute_idx : ext::range(0, attributes_size)) {
const auto &attribute_column = *attribute_column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
const auto inserted = setAttributeValue(attribute, key, attribute_column[row_idx]);
if (!inserted)
should_rollback = true;
}
/// @note on multiple equal keys the mapped value for the first one is stored
if (should_rollback)
keys_pool.rollback(key.size);
}
}
void ComplexKeyHashedDictionary::updateData()
{
/// created upfront to avoid excess allocations
const auto keys_size = dict_struct.key->size();
StringRefs keys(keys_size);
const auto attributes_size = attributes.size();
while (const auto block = stream->read())
if (!saved_block || saved_block->rows() == 0)
{
const auto rows = block.rows();
element_count += rows;
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
const auto key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size),
[&] (const size_t attribute_idx)
{
return block.safeGetByPosition(attribute_idx).column;
});
const auto attribute_column_ptrs = ext::map<Columns>(ext::range(0, attributes_size),
[&] (const size_t attribute_idx)
{
return block.safeGetByPosition(keys_size + attribute_idx).column;
});
for (const auto row_idx : ext::range(0, rows))
while (const auto block = stream->read())
{
/// calculate key once per row
const auto key = placeKeysInPool(row_idx, key_column_ptrs, keys, keys_pool);
auto should_rollback = false;
for (const auto attribute_idx : ext::range(0, attributes_size))
/// We are using this method to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, keys_size + attributes_size))
{
const auto & attribute_column = *attribute_column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
const auto inserted = setAttributeValue(attribute, key, attribute_column[row_idx]);
if (!inserted)
should_rollback = true;
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto saved_key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size), [&](const size_t key_idx)
{
return saved_block->safeGetByPosition(key_idx).column;
});
const auto update_key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size), [&](const size_t key_idx)
{
return block.safeGetByPosition(key_idx).column;
});
Arena temp_key_pool;
ContainerType <std::vector<size_t>> update_key_hash;
for (size_t i = 0; i < block.rows(); ++i)
{
const auto u_key = placeKeysInPool(i, update_key_column_ptrs, keys, temp_key_pool);
update_key_hash[u_key].push_back(i);
}
/// @note on multiple equal keys the mapped value for the first one is stored
if (should_rollback)
keys_pool.rollback(key.size);
}
const size_t rows = saved_block->rows();
IColumn::Filter filter(rows);
for (size_t i = 0; i < saved_block->rows(); ++i)
{
const auto s_key = placeKeysInPool(i, saved_key_column_ptrs, keys, temp_key_pool);
auto it = update_key_hash.find(s_key);
if (it != std::end(update_key_hash))
filter[i] = 0;
else
filter[i] = 1;
}
auto block_columns = block.mutateColumns();
for (const auto attribute_idx : ext::range(0, keys_size + attributes_size))
{
auto & column = saved_block->safeGetByPosition(attribute_idx).column;
const auto & filtered_column = column->filter(filter, -1);
block_columns[attribute_idx]->insertRangeFrom(*filtered_column.get(), 0, filtered_column->size());
}
saved_block->setColumns(std::move(block_columns));
}
stream->readSuffix();
}
stream->readSuffix();
if (saved_block)
blockToAttributes(*saved_block.get());
}
void ComplexKeyHashedDictionary::loadData()
{
if (!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
blockToAttributes(block);
stream->readSuffix();
}
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{
name + ": dictionary source is empty and 'require_nonempty' property is set.",
ErrorCodes::DICTIONARY_IS_EMPTY};
name + ": dictionary source is empty and 'require_nonempty' property is set.",
ErrorCodes::DICTIONARY_IS_EMPTY};
}
template <typename T>

View File

@ -16,13 +16,14 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class ComplexKeyHashedDictionary final : public IDictionaryBase
{
public:
ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty);
const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other);
@ -155,6 +156,10 @@ private:
void createAttributes();
void blockToAttributes(const Block & block);
void updateData();
void loadData();
template <typename T>
@ -220,6 +225,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};

View File

@ -41,8 +41,10 @@ ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
command{config.getString(config_prefix + ".command")},
update_field{config.getString(config_prefix + ".update_field", "")},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context)
@ -51,14 +53,41 @@ ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
command{other.command},
update_field{other.update_field},
format{other.format},
sample_block{other.sample_block},
context(other.context)
{
}
std::string ExecutableDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "\"%Y-%m-%d %H:%M:%S\"", timeinfo);
std::string str_time(buffer);
return command + " " + update_field + " "+ str_time;
///Example case: command -T "2018-02-12 12:44:04"
///should return all entries after mentioned date
///if executable is eligible to return entries according to date.
///Where "-T" is passed as update_field.
}
else
{
std::string str_time("\"0000-00-00 00:00:00\""); ///for initial load
return command + " " + update_field + " "+ str_time;
}
}
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
@ -67,6 +96,14 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
{
std::string command_update = getUpdateFieldAndDate();
LOG_TRACE(log, "loadUpdatedAll " + command_update);
auto process = ShellCommand::execute(command_update);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
namespace
{
@ -174,6 +211,14 @@ bool ExecutableDictionarySource::supportsSelectiveLoad() const
return true;
}
bool ExecutableDictionarySource::hasUpdateField() const
{
if(update_field.empty())
return false;
else
return true;
}
DictionarySourcePtr ExecutableDictionarySource::clone() const
{
return std::make_unique<ExecutableDictionarySource>(*this);

View File

@ -25,6 +25,8 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
@ -34,15 +36,22 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string command;
const std::string update_field;
const std::string format;
Block sample_block;
const Context & context;

View File

@ -138,6 +138,20 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
}
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string &update_field, const std::string &time_point) const
{
std::string out = composeLoadAllQuery();
std::string update_query;
if (!where.empty())
update_query = " AND " + update_field + " >= '" + time_point + "'";
else
update_query = " WHERE " + update_field + " >= '" + time_point + "'";
return out.insert(out.size()-1, update_query); ///This is done to insert "update_query" before "out"'s semicolon
}
std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64> & ids)
{
if (!dict_struct.id)

View File

@ -42,6 +42,9 @@ struct ExternalQueryBuilder
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;
/** Generate a query to load data after certain time point*/
std::string composeUpdateQuery(const std::string &update_field, const std::string &time_point) const;
/** Generate a query to load data by set of UInt64 keys. */
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);

View File

@ -21,6 +21,11 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override
{
throw Exception{"Method loadUpdatedAll is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & /*ids*/) override
{
throw Exception{"Method loadIds is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
@ -35,6 +40,9 @@ public:
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return false; }
///Not supported for FileDictionarySource
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<FileDictionarySource>(*this); }
std::string toString() const override;

View File

@ -20,11 +20,11 @@ static const auto max_array_size = 500000;
FlatDictionary::FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty)
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty),
loaded_ids(initial_array_size, false)
loaded_ids(initial_array_size, false), saved_block{std::move(saved_block)}
{
createAttributes();
@ -42,7 +42,7 @@ FlatDictionary::FlatDictionary(const std::string & name, const DictionaryStructu
}
FlatDictionary::FlatDictionary(const FlatDictionary & other)
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -286,29 +286,105 @@ void FlatDictionary::createAttributes()
}
}
void FlatDictionary::blockToAttributes(const Block &block)
{
const auto & id_column = *block.safeGetByPosition(0).column;
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto &attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
void FlatDictionary::updateData()
{
if (!saved_block || saved_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (const auto block = stream->read())
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto &saved_id_column = *saved_block->safeGetByPosition(0).column;
const auto &update_id_column = *block.safeGetByPosition(0).column;
std::unordered_map<Key, std::vector<size_t>> update_ids;
for (size_t row = 0; row < update_id_column.size(); ++row)
{
const auto id = update_id_column.get64(row);
update_ids[id].push_back(row);
}
const size_t saved_rows = saved_id_column.size();
IColumn::Filter filter(saved_rows);
std::unordered_map<Key, std::vector<size_t>>::iterator it;
for (size_t row = 0; row < saved_id_column.size(); ++row)
{
auto id = saved_id_column.get64(row);
it = update_ids.find(id);
if (it != update_ids.end())
filter[row] = 0;
else
filter[row] = 1;
}
auto block_columns = block.mutateColumns();
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
auto & column = saved_block->safeGetByPosition(attribute_idx).column;
const auto & filtered_column = column->filter(filter, -1);
block_columns[attribute_idx]->insertRangeFrom(*filtered_column.get(), 0, filtered_column->size());
}
saved_block->setColumns(std::move(block_columns));
}
stream->readSuffix();
}
if (saved_block)
blockToAttributes(*saved_block.get());
}
void FlatDictionary::loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
if (!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto & id_column = *block.safeGetByPosition(0).column;
while (const auto block = stream->read())
blockToAttributes(block);
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
stream->readSuffix();
}
stream->readSuffix();
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{

View File

@ -15,11 +15,13 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class FlatDictionary final : public IDictionary
{
public:
FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty);
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
FlatDictionary(const FlatDictionary & other);
@ -153,6 +155,8 @@ private:
};
void createAttributes();
void blockToAttributes(const Block & block);
void updateData();
void loadData();
template <typename T>
@ -219,6 +223,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};
}

View File

@ -20,19 +20,32 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: log(&Logger::get("HTTPDictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
url{config.getString(config_prefix + ".url", "")},
update_field{config.getString(config_prefix + ".update_field", "")},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context),
timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
{
if (update_field.empty())
return;
std::string::size_type option = url.find("?");
if (option == std::string::npos) {
update_field = "?&" + update_field;
} else {
update_field = '&' + update_field;
}
}
HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
: log(&Logger::get("HTTPDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
url{other.url},
update_field{other.update_field},
format{other.format},
sample_block{other.sample_block},
context(other.context),
@ -40,6 +53,27 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
{
}
std::string HTTPDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "=%Y-%m-%d%%20%H:%M:%S", timeinfo);
std::string str_time(buffer);
return url + update_field + str_time;
}
else
{
update_time = std::chrono::system_clock::now();
return url + update_field + "=0000-00-00%2000:00:00"; ///for initial load
}
}
BlockInputStreamPtr HTTPDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
@ -50,6 +84,17 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
{
std::string url_update = getUpdateFieldAndDate();
LOG_TRACE(log, "loadUpdatedAll " + url_update);
Poco::URI uri(url_update);
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts);
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
}
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds " << toString() << " size = " << ids.size());
@ -97,6 +142,11 @@ bool HTTPDictionarySource::supportsSelectiveLoad() const
return true;
}
bool HTTPDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr HTTPDictionarySource::clone() const
{
return std::make_unique<HTTPDictionarySource>(*this);

View File

@ -25,6 +25,8 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
@ -34,17 +36,23 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
Poco::Logger * log;
LocalDateTime getLastModification() const;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string url;
std::string update_field;
const std::string format;
Block sample_block;
const Context & context;

View File

@ -15,9 +15,9 @@ namespace ErrorCodes
HashedDictionary::HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty)
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty)
require_nonempty(require_nonempty), saved_block{std::move(saved_block)}
{
createAttributes();
@ -35,7 +35,7 @@ HashedDictionary::HashedDictionary(const std::string & name, const DictionaryStr
}
HashedDictionary::HashedDictionary(const HashedDictionary & other)
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -280,28 +280,105 @@ void HashedDictionary::createAttributes()
}
}
void HashedDictionary::loadData()
void HashedDictionary::blockToAttributes(const Block &block)
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
const auto & id_column = *block.safeGetByPosition(0).column;
element_count += id_column.size();
while (const auto block = stream->read())
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & id_column = *block.safeGetByPosition(0).column;
const auto &attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
element_count += id_column.size();
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
for (const auto attribute_idx : ext::range(0, attributes.size()))
void HashedDictionary::updateData()
{
if (!saved_block || saved_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto & attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto &saved_id_column = *saved_block->safeGetByPosition(0).column;
const auto &update_id_column = *block.safeGetByPosition(0).column;
std::unordered_map<Key, std::vector<size_t>> update_ids;
for (size_t row = 0; row < update_id_column.size(); ++row)
{
const auto id = update_id_column.get64(row);
update_ids[id].push_back(row);
}
const size_t saved_rows = saved_id_column.size();
IColumn::Filter filter(saved_rows);
std::unordered_map<Key, std::vector<size_t>>::iterator it;
for (size_t row = 0; row < saved_id_column.size(); ++row)
{
auto id = saved_id_column.get64(row);
it = update_ids.find(id);
if (it != update_ids.end())
filter[row] = 0;
else
filter[row] = 1;
}
auto block_columns = block.mutateColumns();
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
auto & column = saved_block->safeGetByPosition(attribute_idx).column;
const auto & filtered_column = column->filter(filter, -1);
block_columns[attribute_idx]->insertRangeFrom(*filtered_column.get(), 0, filtered_column->size());
}
saved_block->setColumns(std::move(block_columns));
}
stream->readSuffix();
}
stream->readSuffix();
if (saved_block)
blockToAttributes(*saved_block.get());
}
void HashedDictionary::loadData()
{
if (!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
blockToAttributes(block);
stream->readSuffix();
}
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{

View File

@ -14,11 +14,13 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class HashedDictionary final : public IDictionary
{
public:
HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty);
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
HashedDictionary(const HashedDictionary & other);
@ -152,6 +154,10 @@ private:
void createAttributes();
void blockToAttributes(const Block & block);
void updateData();
void loadData();
template <typename T>
@ -217,6 +223,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};
}

View File

@ -19,6 +19,9 @@ public:
/// Returns an input stream with all the data available from this source.
virtual BlockInputStreamPtr loadAll() = 0;
/// Returns an input stream with updated data available from this source.
virtual BlockInputStreamPtr loadUpdatedAll() = 0;
/** Indicates whether this source supports "random access" loading of data
* loadId and loadIds can only be used if this function returns true.
*/
@ -39,6 +42,9 @@ public:
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;
/// Returns true if update field is defined
virtual bool hasUpdateField() const = 0;
virtual DictionarySourcePtr clone() const = 0;
/// returns an informal string describing the source

View File

@ -1,25 +1,26 @@
#pragma once
#include <Common/SharedLibrary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ExternalResultDescription.h>
#include <Dictionaries/IDictionarySource.h>
#include <Common/SharedLibrary.h>
#include <common/LocalDateTime.h>
namespace Poco
{
class Logger;
class Logger;
namespace Util
{
class AbstractConfiguration;
}
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
class CStringsHolder;
/// Allows loading dictionaries from dynamic libraries (.so)
@ -40,6 +41,11 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override
{
throw Exception{"Method loadUpdatedAll is unsupported for LibraryDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows) override;
@ -48,6 +54,9 @@ public:
bool supportsSelectiveLoad() const override;
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override;
std::string toString() const override;
@ -67,4 +76,5 @@ private:
std::shared_ptr<CStringsHolder> settings;
void * lib_data = nullptr;
};
}

View File

@ -42,6 +42,11 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override
{
throw Exception{"Method loadUpdatedAll is unsupported for MongoDBDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
@ -52,6 +57,9 @@ public:
/// @todo: for MongoDB, modification date can somehow be determined from the `_id` object field
bool isModified() const override { return true; }
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<MongoDBDictionarySource>(*this); }
std::string toString() const override;

View File

@ -7,6 +7,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include <Dictionaries/MySQLDictionarySource.h>
#include <Dictionaries/MySQLBlockInputStream.h>
@ -24,10 +25,12 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block)
: log(&Logger::get("MySQLDictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field")},
dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)},
sample_block{sample_block},
pool{config, config_prefix},
@ -40,10 +43,12 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st
/// copy-constructor is provided in order to support cloneability
MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other)
: log(&Logger::get("MySQLDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
db{other.db},
table{other.table},
where{other.where},
update_field{other.update_field},
dont_check_update_time{other.dont_check_update_time},
sample_block{other.sample_block},
pool{other.pool},
@ -53,6 +58,24 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
{
}
std::string MySQLDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
std::string str_time = std::to_string(LocalDateTime(hr_time));
return query_builder.composeUpdateQuery(update_field, str_time);
}
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
}
}
BlockInputStreamPtr MySQLDictionarySource::loadAll()
{
last_modification = getLastModification();
@ -61,6 +84,15 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
{
last_modification = getLastModification();
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
@ -100,6 +132,11 @@ bool MySQLDictionarySource::supportsSelectiveLoad() const
return true;
}
bool MySQLDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr MySQLDictionarySource::clone() const
{
return std::make_unique<MySQLDictionarySource>(*this);

View File

@ -35,6 +35,8 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
@ -44,11 +46,15 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
static std::string quoteForLike(const std::string s);
LocalDateTime getLastModification() const;
@ -57,10 +63,13 @@ private:
std::string doInvalidateQuery(const std::string & request) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
const bool dont_check_update_time;
Block sample_block;
mutable mysqlxx::PoolWithFailover pool;

View File

@ -20,10 +20,12 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block, const Context & context)
: log(&Logger::get("ODBCDictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field")},
sample_block{sample_block},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None}, /// NOTE Better to obtain quoting style via ODBC interface.
load_all_query{query_builder.composeLoadAllQuery()},
@ -46,10 +48,12 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
/// copy-constructor is provided in order to support cloneability
ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
: log(&Logger::get("ODBCDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
db{other.db},
table{other.table},
where{other.where},
update_field{other.update_field},
sample_block{other.sample_block},
pool{other.pool},
query_builder{dict_struct, db, table, where, ExternalQueryBuilder::None},
@ -58,12 +62,38 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
{
}
std::string ODBCDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
std::string str_time = std::to_string(LocalDateTime(hr_time));
return query_builder.composeUpdateQuery(update_field, str_time);
}
else
{
update_time = std::chrono::system_clock::now();
std::string str_time("0000-00-00 00:00:00"); ///for initial load
return query_builder.composeUpdateQuery(update_field, str_time);
}
}
BlockInputStreamPtr ODBCDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<ODBCBlockInputStream>(pool->get(), load_all_query, sample_block, max_block_size);
}
BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll()
{
std::string load_query_update = getUpdateFieldAndDate();
LOG_TRACE(log, load_query_update);
return std::make_shared<ODBCBlockInputStream>(pool->get(), load_query_update, sample_block, max_block_size);
}
BlockInputStreamPtr ODBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
@ -82,6 +112,11 @@ bool ODBCDictionarySource::supportsSelectiveLoad() const
return true;
}
bool ODBCDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr ODBCDictionarySource::clone() const
{
return std::make_unique<ODBCDictionarySource>(*this);

View File

@ -38,6 +38,8 @@ public:
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
@ -47,20 +49,26 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
Block sample_block;
std::shared_ptr<Poco::Data::SessionPool> pool = nullptr;
ExternalQueryBuilder query_builder;