Add external dictionary update feature

feature is implemented for DICT TYPES THAT SUPPORT
and for sources SOURCE TYPES THAT ARE SUPPORTED
to use the feature add <update_field>...</...> to dictionary sourcei config.
If the field is skipped or used with unsupported dictionary types,
all data is updated every time if the field is provided,
value of last update time is passed to the source
with the expectation that only records that were updated after
provided time will be passed to the dictionary
This commit is contained in:
Arsen Hakobyan 2018-01-15 16:44:39 +04:00
parent 07931cad69
commit 7658665737
20 changed files with 654 additions and 51 deletions

View File

@ -36,6 +36,9 @@ public:
bool isModified() const override { return true; }
bool supportsSelectiveLoad() const override { return true; }
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); }
std::string toString() const override;

View File

@ -17,9 +17,9 @@ namespace ErrorCodes
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty)
const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty)
require_nonempty(require_nonempty), saved_block{std::move(saved_block)}
{
createAttributes();
@ -38,7 +38,7 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
}
ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other)
: ComplexKeyHashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: ComplexKeyHashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -227,32 +227,133 @@ void ComplexKeyHashedDictionary::createAttributes()
}
}
void ComplexKeyHashedDictionary::loadData()
void ComplexKeyHashedDictionary::updateData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
/// created upfront to avoid excess allocations
const auto keys_size = dict_struct.key->size();
StringRefs keys(keys_size);
const auto attributes_size = attributes.size();
while (const auto block = stream->read())
if (!saved_block || saved_block->rows() == 0)
{
const auto rows = block.rows();
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
/// We are using this method to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, keys_size + attributes_size))
{
const IColumn &update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadAll();
const auto saved_key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size), [&](const size_t key_idx)
{
return saved_block->safeGetByPosition(key_idx).column;
});
stream->readPrefix();
while (const auto block = stream->read())
{
const auto update_key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size), [&](const size_t key_idx)
{
return block.safeGetByPosition(key_idx).column;
});
const auto &saved_columns = saved_block->mutateColumns();
std::vector<int> update_indices, saved_indices;
for (size_t i = 0; i < saved_block->rows(); ++i)
{
for (size_t j = 0; j < block.rows(); ++j)
{
Arena s_temp_key_pool;
Arena u_temp_key_pool;
const auto s_key = placeKeysInPool(i, saved_key_column_ptrs, keys, s_temp_key_pool);
const auto u_key = placeKeysInPool(j, update_key_column_ptrs, keys, s_temp_key_pool);
if (s_key.toString() == u_key.toString())
{
update_indices.push_back(j); ///Indices of columns that are for update
saved_indices.push_back(i); ///Indices of columns that need to be updated
}
s_temp_key_pool.rollback(s_key.size);
u_temp_key_pool.rollback(u_key.size);
}
}
BlockPtr temp_block = std::make_shared<DB::Block>(saved_block->cloneEmpty());
for (const auto attribute_idx : ext::range(0, keys_size + attributes_size))
{
if (update_indices.empty())
saved_columns[attribute_idx]->insertRangeFrom(*block.safeGetByPosition(attribute_idx).column, 0,
block.safeGetByPosition(attribute_idx).column->size());
else
{
const auto &temp_columns = temp_block->mutateColumns();
for (size_t i = 0; i < saved_block->rows(); ++i)
{
std::vector<int>::iterator it;
it = std::find(saved_indices.begin(), saved_indices.end(), i);
if (it != saved_indices.end())
{
int pos = std::distance(saved_indices.begin(), it);
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, pos);
}
else
{
temp_columns[attribute_idx]->insertFrom(*saved_block->safeGetByPosition(attribute_idx).column, i);
}
}
for (size_t i = 0; i < block.rows(); ++i)
{
bool exists = std::any_of(update_indices.begin(), update_indices.end(), [&](size_t x)
{
return x == i;
});
if (!exists)
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, i);
}
}
}
if (temp_block->rows() != 0)
{
saved_block.reset();
saved_block = std::make_shared<DB::Block>(*temp_block);
temp_block.reset();
}
}
stream->readSuffix();
}
if (saved_block)
{
const auto rows = saved_block->rows();
element_count += rows;
const auto key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size),
[&] (const size_t attribute_idx)
[&](const size_t attribute_idx)
{
return block.safeGetByPosition(attribute_idx).column;
return saved_block->safeGetByPosition(attribute_idx).column;
});
const auto attribute_column_ptrs = ext::map<Columns>(ext::range(0, attributes_size),
[&] (const size_t attribute_idx)
[&](const size_t attribute_idx)
{
return block.safeGetByPosition(keys_size + attribute_idx).column;
return saved_block->safeGetByPosition(keys_size + attribute_idx).column;
});
for (const auto row_idx : ext::range(0, rows))
@ -264,8 +365,8 @@ void ComplexKeyHashedDictionary::loadData()
for (const auto attribute_idx : ext::range(0, attributes_size))
{
const auto & attribute_column = *attribute_column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
const auto &attribute_column = *attribute_column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
const auto inserted = setAttributeValue(attribute, key, attribute_column[row_idx]);
if (!inserted)
should_rollback = true;
@ -275,15 +376,66 @@ void ComplexKeyHashedDictionary::loadData()
if (should_rollback)
keys_pool.rollback(key.size);
}
}
}
stream->readSuffix();
void ComplexKeyHashedDictionary::loadData()
{
if (!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
/// created upfront to avoid excess allocations
const auto keys_size = dict_struct.key->size();
StringRefs keys(keys_size);
const auto attributes_size = attributes.size();
while (const auto block = stream->read()) {
const auto rows = block.rows();
element_count += rows;
const auto key_column_ptrs = ext::map<Columns>(ext::range(0, keys_size),
[&](const size_t attribute_idx) {
return block.safeGetByPosition(attribute_idx).column;
});
const auto attribute_column_ptrs = ext::map<Columns>(ext::range(0, attributes_size),
[&](const size_t attribute_idx) {
return block.safeGetByPosition(
keys_size + attribute_idx).column;
});
for (const auto row_idx : ext::range(0, rows)) {
/// calculate key once per row
const auto key = placeKeysInPool(row_idx, key_column_ptrs, keys, keys_pool);
auto should_rollback = false;
for (const auto attribute_idx : ext::range(0, attributes_size)) {
const auto &attribute_column = *attribute_column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
const auto inserted = setAttributeValue(attribute, key, attribute_column[row_idx]);
if (!inserted)
should_rollback = true;
}
/// @note on multiple equal keys the mapped value for the first one is stored
if (should_rollback)
keys_pool.rollback(key.size);
}
}
stream->readSuffix();
}
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{
name + ": dictionary source is empty and 'require_nonempty' property is set.",
ErrorCodes::DICTIONARY_IS_EMPTY};
name + ": dictionary source is empty and 'require_nonempty' property is set.",
ErrorCodes::DICTIONARY_IS_EMPTY};
}
template <typename T>

View File

@ -16,13 +16,14 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class ComplexKeyHashedDictionary final : public IDictionaryBase
{
public:
ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty);
const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other);
@ -155,6 +156,8 @@ private:
void createAttributes();
void updateData();
void loadData();
template <typename T>
@ -220,6 +223,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};

View File

@ -41,8 +41,11 @@ ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{std::chrono::system_clock::now()},
dict_struct{dict_struct_},
command{config.getString(config_prefix + ".command")},
update_field{config.getString(config_prefix + ".update_field", "")},
date{"0000-00-00 00:00:00"},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context)
@ -51,6 +54,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
command{other.command},
format{other.format},
@ -59,12 +63,56 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar
{
}
void ExecutableDictionarySource::setDate()
{
if (!hasUpdateField())
return;
else if ((hasUpdateField() && date == "0000-00-00 00:00:00")) {
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
}
else {
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
command_update = command + update_field + date;
}
}
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
setDate();
if (!command_update.empty())
{
auto process = ShellCommand::execute(command_update);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
else
{
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
}
@ -173,6 +221,14 @@ bool ExecutableDictionarySource::supportsSelectiveLoad() const
return true;
}
bool ExecutableDictionarySource::hasUpdateField() const
{
if(update_field.empty())
return false;
else
return true;
}
DictionarySourcePtr ExecutableDictionarySource::clone() const
{
return std::make_unique<ExecutableDictionarySource>(*this);

View File

@ -34,15 +34,23 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
void setDate();
private:
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string command;
std::string command_update;
const std::string update_field;
std::string date;
const std::string format;
Block sample_block;
const Context & context;

View File

@ -138,6 +138,16 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
}
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string &update_field, std::string &time_point) const
{
std::string out = composeLoadAllQuery();
std::string update_query = " WHERE " + update_field + " > '" + time_point + "'";
out.insert(out.size()-1, update_query);
return out;
}
std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64> & ids)
{
if (!dict_struct.id)

View File

@ -42,6 +42,9 @@ struct ExternalQueryBuilder
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;
/** Generate a query to load data after certain time point*/
std::string composeUpdateQuery(const std::string &update_field, std::string &time_point) const;
/** Generate a query to load data by set of UInt64 keys. */
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);

View File

@ -35,6 +35,9 @@ public:
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return false; }
///Not supported for FileDictionarySource
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<FileDictionarySource>(*this); }
std::string toString() const override;

View File

@ -20,16 +20,18 @@ static const auto max_array_size = 500000;
FlatDictionary::FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty)
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty),
loaded_ids(initial_array_size, false)
loaded_ids(initial_array_size, false), saved_block{std::move(saved_block)}
{
createAttributes();
try
{
update_time = std::chrono::system_clock::now();
loadData();
calculateBytesAllocated();
}
@ -42,7 +44,7 @@ FlatDictionary::FlatDictionary(const std::string & name, const DictionaryStructu
}
FlatDictionary::FlatDictionary(const FlatDictionary & other)
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -286,29 +288,137 @@ void FlatDictionary::createAttributes()
}
}
void FlatDictionary::loadData()
void FlatDictionary::updateData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
if(!saved_block || saved_block->rows() == 0)
{
const auto & id_column = *block.safeGetByPosition(0).column;
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
/// We are using this method to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
const auto &saved_id_column = *saved_block->safeGetByPosition(0).column;
while (const auto block = stream->read())
{
const auto &update_id_column = *block.safeGetByPosition(0).column;
const auto &saved_columns = saved_block->mutateColumns();
std::vector<int> update_indices, saved_indices;
for (size_t i = 0; i < saved_id_column.size(); ++i)
{
for (size_t j = 0; j < update_id_column.size(); ++j)
{
int need_update = saved_id_column.compareAt(i, j, update_id_column, -1);
if(need_update == 0)
{
update_indices.push_back(j); ///Indices of columns that are for update
saved_indices.push_back(i); ///Indices of columns that need to be updated
}
}
}
BlockPtr temp_block = std::make_shared<DB::Block>(saved_block->cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size()+1)) {
if (update_indices.empty())
saved_columns[attribute_idx]->insertRangeFrom(*block.safeGetByPosition(attribute_idx).column, 0,
block.safeGetByPosition(attribute_idx).column->size());
else
{
const auto &temp_columns = temp_block->mutateColumns();
for (size_t i = 0; i < saved_block->rows(); ++i)
{
std::vector<int>::iterator it;
it = std::find(saved_indices.begin(), saved_indices.end(), i);
if (it != saved_indices.end()) {
int pos = std::distance(saved_indices.begin(), it);
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, pos);
}
else {
temp_columns[attribute_idx]->insertFrom(*saved_block->safeGetByPosition(attribute_idx).column, i);
}
}
for (size_t i = 0; i < update_id_column.size(); ++i)
{
bool exists = std::any_of(update_indices.begin(), update_indices.end(), [&](size_t x)
{
return x == i;
});
if (!exists)
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, i);
}
}
}
if (temp_block->rows() != 0)
{
saved_block.reset();
saved_block = std::make_shared<DB::Block>(*temp_block);
temp_block.reset();
}
}
stream->readSuffix();
}
if (saved_block)
{
const auto &id_column = *saved_block->safeGetByPosition(0).column;
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
const auto &attribute_column = *saved_block->safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
stream->readSuffix();
void FlatDictionary::loadData()
{
if(!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read()) {
const auto &id_column = *block.safeGetByPosition(0).column;
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size())) {
const auto &attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{

View File

@ -15,11 +15,13 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class FlatDictionary final : public IDictionary
{
public:
FlatDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty);
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
FlatDictionary(const FlatDictionary & other);
@ -153,6 +155,7 @@ private:
};
void createAttributes();
void updateData();
void loadData();
template <typename T>
@ -217,8 +220,11 @@ private:
mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::chrono::time_point<std::chrono::system_clock> update_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};
}

View File

@ -20,8 +20,11 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: log(&Logger::get("HTTPDictionarySource")),
update_time{std::chrono::system_clock::now()},
dict_struct{dict_struct_},
url{config.getString(config_prefix + ".url", "")},
update_field{config.getString(config_prefix + ".update_field", "")},
date{"0000-00-00%2000:00:00"},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context),
@ -31,8 +34,11 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru
HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
: log(&Logger::get("HTTPDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
url{other.url},
update_field{other.update_field},
date{other.date},
format{other.format},
sample_block{other.sample_block},
context(other.context),
@ -40,13 +46,53 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
{
}
void HTTPDictionarySource::setDate()
{
if (!hasUpdateField())
return;
else if ((hasUpdateField() && date == "0000-00-00%2000:00:00"))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d%%20%H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
}
else
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d%%20%H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
url_update = url + update_field + date;
}
}
BlockInputStreamPtr HTTPDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll " + toString());
Poco::URI uri(url);
setDate();
Poco::URI uri;
if (!url_update.empty())
uri = url_update;
else
uri = url;
auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts);
auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
}
@ -97,6 +143,14 @@ bool HTTPDictionarySource::supportsSelectiveLoad() const
return true;
}
bool HTTPDictionarySource::hasUpdateField() const
{
if (update_field.empty())
return false;
else
return true;
}
DictionarySourcePtr HTTPDictionarySource::clone() const
{
return std::make_unique<HTTPDictionarySource>(*this);

View File

@ -34,17 +34,25 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
void setDate();
private:
Poco::Logger * log;
LocalDateTime getLastModification() const;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string url;
std::string url_update;
const std::string update_field;
std::string date;
const std::string format;
Block sample_block;
const Context & context;

View File

@ -15,9 +15,9 @@ namespace ErrorCodes
HashedDictionary::HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty)
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
require_nonempty(require_nonempty)
require_nonempty(require_nonempty), saved_block{std::move(saved_block)}
{
createAttributes();
@ -35,7 +35,7 @@ HashedDictionary::HashedDictionary(const std::string & name, const DictionaryStr
}
HashedDictionary::HashedDictionary(const HashedDictionary & other)
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty}
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.require_nonempty, other.saved_block}
{
}
@ -280,28 +280,131 @@ void HashedDictionary::createAttributes()
}
}
void HashedDictionary::loadData()
void HashedDictionary::updateData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
if (!saved_block || saved_block->rows() == 0) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
while (const auto block = stream->read()) {
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1)) {
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->mutate();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
}
else
{
const auto & id_column = *block.safeGetByPosition(0).column;
auto stream = source_ptr->loadAll();
stream->readPrefix();
const auto &saved_id_column = *saved_block->safeGetByPosition(0).column;
while (const auto block = stream->read())
{
const auto &update_id_column = *block.safeGetByPosition(0).column;
const auto &saved_columns = saved_block->mutateColumns();
std::vector<int> update_indices, saved_indices;
for (size_t i = 0; i < saved_id_column.size(); ++i)
{
for (size_t j = 0; j < update_id_column.size(); ++j)
{
int need_update = saved_id_column.compareAt(i, j, update_id_column, -1);
if (need_update == 0)
{
update_indices.push_back(j); ///Indices of columns that are for update
saved_indices.push_back(i); ///Indices of columns that need to be updated
}
}
}
BlockPtr temp_block = std::make_shared<DB::Block>(saved_block->cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
if (update_indices.empty())
saved_columns[attribute_idx]->insertRangeFrom(*block.safeGetByPosition(attribute_idx).column, 0,
block.safeGetByPosition(attribute_idx).column->size());
else
{
const auto &temp_columns = temp_block->mutateColumns();
for (size_t i = 0; i < saved_block->rows(); ++i)
{
std::vector<int>::iterator it;
it = std::find(saved_indices.begin(), saved_indices.end(), i);
if (it != saved_indices.end())
{
int pos = std::distance(saved_indices.begin(), it);
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, pos);
}
else
temp_columns[attribute_idx]->insertFrom(*saved_block->safeGetByPosition(attribute_idx).column, i);
for (size_t i = 0; i < update_id_column.size(); ++i)
{
bool exists = std::any_of(update_indices.begin(), update_indices.end(), [&](size_t x)
{
return x == i;
});
if (!exists)
temp_columns[attribute_idx]->insertFrom(*block.safeGetByPosition(attribute_idx).column, i);
}
}
}
}
if (temp_block->rows() != 0)
{
saved_block.reset();
saved_block = std::make_shared<DB::Block>(*temp_block);
temp_block.reset();
}
}
}
if (saved_block)
{
const auto &id_column = *saved_block->safeGetByPosition(0).column;
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
const auto &attribute_column = *saved_block->safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
}
stream->readSuffix();
void HashedDictionary::loadData()
{
if(!source_ptr->hasUpdateField()) {
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read()) {
const auto &id_column = *block.safeGetByPosition(0).column;
element_count += id_column.size();
for (const auto attribute_idx : ext::range(0, attributes.size())) {
const auto &attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
auto &attribute = attributes[attribute_idx];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
else
updateData();
if (require_nonempty && 0 == element_count)
throw Exception{

View File

@ -14,11 +14,13 @@
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class HashedDictionary final : public IDictionary
{
public:
HashedDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty);
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
HashedDictionary(const HashedDictionary & other);
@ -152,6 +154,8 @@ private:
void createAttributes();
void updateData();
void loadData();
template <typename T>
@ -217,6 +221,8 @@ private:
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block;
};
}

View File

@ -39,6 +39,9 @@ public:
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;
/// Returns true if update field is defined
virtual bool hasUpdateField() const = 0;
virtual DictionarySourcePtr clone() const = 0;
/// returns an informal string describing the source

View File

@ -47,6 +47,9 @@ public:
bool supportsSelectiveLoad() const override;
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override;
std::string toString() const override;

View File

@ -52,6 +52,9 @@ public:
/// @todo: for MongoDB, modification date can somehow be determined from the `_id` object field
bool isModified() const override { return true; }
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<MongoDBDictionarySource>(*this); }
std::string toString() const override;

View File

@ -24,10 +24,13 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block)
: log(&Logger::get("MySQLDictionarySource")),
update_time{std::chrono::system_clock::now()},
dict_struct{dict_struct_},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field")},
date{"0000-00-00 00:00:00"},
dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)},
sample_block{sample_block},
pool{config, config_prefix},
@ -40,10 +43,13 @@ MySQLDictionarySource::MySQLDictionarySource(const DictionaryStructure & dict_st
/// copy-constructor is provided in order to support cloneability
MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other)
: log(&Logger::get("MySQLDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
db{other.db},
table{other.table},
where{other.where},
update_field{other.update_field},
date{other.date},
dont_check_update_time{other.dont_check_update_time},
sample_block{other.sample_block},
pool{other.pool},
@ -53,12 +59,53 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
{
}
void MySQLDictionarySource::setDate()
{
if (!hasUpdateField())
return;
else if ((hasUpdateField() && date == "0000-00-00 00:00:00"))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d%%20%H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
}
else
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(update_time - tmp_time);
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - duration.count() - 1;
char buffer [80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
strftime(buffer, 80, "%Y-%m-%d %H:%M:%S", timeinfo);
std::string str_time(buffer);
date = str_time;
std::string tmp = load_all_query;
tmp.pop_back();
load_all_query_update = tmp + " WHERE " + update_field + " > '" + date + "';";
}
}
BlockInputStreamPtr MySQLDictionarySource::loadAll()
{
last_modification = getLastModification();
LOG_TRACE(log, load_all_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size);
setDate();
if (!load_all_query_update.empty())
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query_update, sample_block, max_block_size);
else
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -100,6 +147,14 @@ bool MySQLDictionarySource::supportsSelectiveLoad() const
return true;
}
bool MySQLDictionarySource::hasUpdateField() const
{
if(update_field.empty())
return false;
else
return true;
}
DictionarySourcePtr MySQLDictionarySource::clone() const
{
return std::make_unique<MySQLDictionarySource>(*this);

View File

@ -44,10 +44,14 @@ public:
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
void setDate();
private:
static std::string quoteForLike(const std::string s);
@ -57,15 +61,20 @@ private:
std::string doInvalidateQuery(const std::string & request) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
std::string date;
const bool dont_check_update_time;
Block sample_block;
mutable mysqlxx::PoolWithFailover pool;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string load_all_query_update;
LocalDateTime last_modification;
std::string invalidate_query;
mutable std::string invalidate_query_response;

View File

@ -47,6 +47,9 @@ public:
bool supportsSelectiveLoad() const override;
///Not yet supported
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override;
std::string toString() const override;