This commit is contained in:
Evgeniy Gatov 2015-05-26 17:24:01 +03:00
commit d651c88e1b
3 changed files with 102 additions and 63 deletions

View File

@ -43,10 +43,14 @@ private:
mutable std::mutex dictionaries_mutex;
using dictionary_ptr_t = std::shared_ptr<MultiVersion<IDictionary>>;
using dictionary_origin_pair_t = std::pair<dictionary_ptr_t, std::string>;
std::unordered_map<std::string, dictionary_origin_pair_t> dictionaries;
/// exception pointers for notifying user about failures on dictionary creation
std::unordered_map<std::string, std::exception_ptr> stored_exceptions;
struct dictionary_info final
{
dictionary_ptr_t dict;
std::string origin;
std::exception_ptr exception;
};
std::unordered_map<std::string, dictionary_info> dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937_64 rnd_engine{getSeed()};
@ -95,24 +99,7 @@ public:
reloading_thread.join();
}
MultiVersion<IDictionary>::Version getDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
{
const auto exception_it = stored_exceptions.find(name);
if (exception_it != std::end(stored_exceptions))
std::rethrow_exception(exception_it->second);
else
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
}
return it->second.first->get();
}
MultiVersion<IDictionary>::Version getDictionary(const std::string & name) const;
};
}

View File

@ -48,7 +48,10 @@ void ExternalDictionaries::reloadImpl(const bool throw_on_error)
try
{
auto current = dictionary.second.first->get();
if (!dictionary.second.dict)
continue;
auto current = dictionary.second.dict->get();
const auto & lifetime = current->getLifetime();
/// do not update dictionaries with zero as lifetime
@ -75,16 +78,16 @@ void ExternalDictionaries::reloadImpl(const bool throw_on_error)
{
/// create new version of dictionary
auto new_version = current->clone();
dictionary.second.first->set(new_version.release());
dictionary.second.dict->set(new_version.release());
}
}
/// erase stored exception on success
stored_exceptions.erase(name);
dictionary.second.exception = std::exception_ptr{};
}
catch (...)
{
stored_exceptions.emplace(name, std::current_exception());
dictionary.second.exception = std::current_exception();
try
{
@ -162,8 +165,8 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
auto it = dictionaries.find(name);
if (it != std::end(dictionaries))
if (it->second.second != config_path)
throw std::runtime_error{"Overriding dictionary from file " + it->second.second};
if (it->second.origin != config_path)
throw std::runtime_error{"Overriding dictionary from file " + it->second.origin};
auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context);
if (!dict_ptr->isCached())
@ -184,22 +187,39 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
dictionaries.emplace(name, dictionary_origin_pair_t{
dictionaries.emplace(name, dictionary_info{
std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()),
config_path
});
}
else
it->second.first->set(dict_ptr.release());
{
if (it->second.dict)
it->second.dict->set(dict_ptr.release());
else
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
it->second.dict = std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release());
}
/// erase stored exception on success
stored_exceptions.erase(name);
/// erase stored exception on success
it->second.exception = std::exception_ptr{};
}
}
catch (...)
{
const auto exception_ptr = std::current_exception();
if (!name.empty())
stored_exceptions.emplace(name, exception_ptr);
{
const auto exception_ptr = std::current_exception();
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
dictionaries.emplace(name, dictionary_info{nullptr, config_path, exception_ptr});
}
else
it->second.exception = exception_ptr;
}
try
{
@ -223,11 +243,28 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
/// propagate exception
if (throw_on_error)
std::rethrow_exception(exception_ptr);
throw;
}
}
}
}
}
MultiVersion<IDictionary>::Version ExternalDictionaries::getDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
if (!it->second.dict && it->second.exception)
std::rethrow_exception(it->second.exception);
return it->second.dict->get();
}
}

View File

@ -21,8 +21,8 @@ StorageSystemDictionaries::StorageSystemDictionaries(const std::string & name)
: name{name},
columns{
{ "name", new DataTypeString },
{ "type", new DataTypeString },
{ "origin", new DataTypeString },
{ "type", new DataTypeString },
{ "attribute.names", new DataTypeArray{new DataTypeString} },
{ "attribute.types", new DataTypeArray{new DataTypeString} },
{ "has_hierarchy", new DataTypeUInt8 },
@ -56,8 +56,8 @@ BlockInputStreams StorageSystemDictionaries::read(
processed_stage = QueryProcessingStage::FetchColumns;
ColumnWithNameAndType col_name{new ColumnString, new DataTypeString, "name"};
ColumnWithNameAndType col_type{new ColumnString, new DataTypeString, "type"};
ColumnWithNameAndType col_origin{new ColumnString, new DataTypeString, "origin"};
ColumnWithNameAndType col_type{new ColumnString, new DataTypeString, "type"};
ColumnWithNameAndType col_attribute_names{
new ColumnArray{new ColumnString},
new DataTypeArray{new DataTypeString},
@ -83,34 +83,51 @@ BlockInputStreams StorageSystemDictionaries::read(
for (const auto & dict_info : external_dictionaries.dictionaries)
{
const auto & name = dict_info.first;
const auto dict_ptr = dict_info.second.first->get();
col_name.column->insert(dict_info.first);
col_origin.column->insert(dict_info.second.origin);
col_name.column->insert(name);
col_type.column->insert(dict_ptr->getTypeName());
col_origin.column->insert(dict_info.second.second);
if (dict_info.second.dict)
{
const auto dict_ptr = dict_info.second.dict->get();
const auto & dict_struct = dict_ptr->getStructure();
col_attribute_names.column->insert(ext::map<Array>(dict_struct.attributes, [] (auto & attr) -> decltype(auto) {
return attr.name;
}));
col_attribute_types.column->insert(ext::map<Array>(dict_struct.attributes, [] (auto & attr) -> decltype(auto) {
return attr.type->getName();
}));
col_has_hierarchy.column->insert(UInt64{dict_ptr->hasHierarchy()});
col_bytes_allocated.column->insert(dict_ptr->getBytesAllocated());
col_query_count.column->insert(dict_ptr->getQueryCount());
col_hit_rate.column->insert(dict_ptr->getHitRate());
col_element_count.column->insert(dict_ptr->getElementCount());
col_load_factor.column->insert(dict_ptr->getLoadFactor());
col_creation_time.column->insert(std::chrono::system_clock::to_time_t(dict_ptr->getCreationTime()));
col_type.column->insert(dict_ptr->getTypeName());
const auto exception_it = external_dictionaries.stored_exceptions.find(name);
if (exception_it != std::end(external_dictionaries.stored_exceptions))
const auto & dict_struct = dict_ptr->getStructure();
col_attribute_names.column->insert(ext::map<Array>(dict_struct.attributes, [] (auto & attr) -> decltype(auto) {
return attr.name;
}));
col_attribute_types.column->insert(ext::map<Array>(dict_struct.attributes, [] (auto & attr) -> decltype(auto) {
return attr.type->getName();
}));
col_has_hierarchy.column->insert(UInt64{dict_ptr->hasHierarchy()});
col_bytes_allocated.column->insert(dict_ptr->getBytesAllocated());
col_query_count.column->insert(dict_ptr->getQueryCount());
col_hit_rate.column->insert(dict_ptr->getHitRate());
col_element_count.column->insert(dict_ptr->getElementCount());
col_load_factor.column->insert(dict_ptr->getLoadFactor());
col_creation_time.column->insert(std::chrono::system_clock::to_time_t(dict_ptr->getCreationTime()));
col_source.column->insert(dict_ptr->getSource()->toString());
}
else
{
col_type.column->insertDefault();
col_attribute_names.column->insertDefault();
col_attribute_types.column->insertDefault();
col_has_hierarchy.column->insertDefault();
col_bytes_allocated.column->insertDefault();
col_query_count.column->insertDefault();
col_hit_rate.column->insertDefault();
col_element_count.column->insertDefault();
col_load_factor.column->insertDefault();
col_creation_time.column->insertDefault();
col_source.column->insertDefault();
}
if (dict_info.second.exception)
{
try
{
std::rethrow_exception(exception_it->second);
std::rethrow_exception(dict_info.second.exception);
}
catch (const Exception & e)
{
@ -130,15 +147,13 @@ BlockInputStreams StorageSystemDictionaries::read(
}
}
else
col_last_exception.column->insert(std::string{});
col_source.column->insert(dict_ptr->getSource()->toString());
col_last_exception.column->insertDefault();
}
Block block{
col_name,
col_type,
col_origin,
col_type,
col_attribute_names,
col_attribute_types,
col_has_hierarchy,