some refactoring

This commit is contained in:
Nikita Vasilev 2020-01-08 22:41:05 +03:00
parent ce29a3cc00
commit fc94ffe84e
2 changed files with 76 additions and 100 deletions

View File

@ -364,7 +364,6 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
{
Poco::Logger::get("IDS:").information(std::to_string(ids.size()));
PaddedPODArray<Index> indices(ids.size());
for (size_t i = 0; i < ids.size(); ++i)
{
@ -465,8 +464,6 @@ void CachePartition::getValueFromStorage(
blocks_to_indices.back().push_back(i);
}
Poco::Logger::get("requests:").information(std::to_string(requests.size()));
AIOContext aio_context(READ_BUFFER_SIZE_BLOCKS);
std::vector<bool> processed(requests.size(), false);
@ -611,30 +608,27 @@ void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UIn
template <typename Iterator>
void CachePartition::markExpired(const Iterator & it) const
{
Poco::Logger::get("markExpired").information("expired: " + std::to_string(it->first));
key_to_index_and_metadata.erase(it);
}
CacheStorage::CacheStorage(SSDCacheDictionary & dictionary_, const std::string & path_, const size_t partitions_count_, const size_t partition_max_size_)
: dictionary(dictionary_)
CacheStorage::CacheStorage(
const Attributes & attributes_structure_, const std::string & path_,
const size_t partitions_count_, const size_t partition_max_size_)
: attributes_structure(attributes_structure_)
, path(path_)
, partition_max_size(partition_max_size_)
, log(&Poco::Logger::get("CacheStorage"))
{
std::vector<AttributeUnderlyingType> structure;
for (const auto & item : dictionary.getStructure().attributes)
{
structure.push_back(item.underlying_type);
}
for (size_t partition_id = 0; partition_id < partitions_count_; ++partition_id)
partitions.emplace_back(std::make_unique<CachePartition>(AttributeUnderlyingType::utUInt64, structure, path_, partition_id, partition_max_size));
partitions.emplace_back(std::make_unique<CachePartition>(AttributeUnderlyingType::utUInt64,
attributes_structure, path_, partition_id, partition_max_size));
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found)
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values)
{
Poco::Logger::get("cachestorage").information("update");
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
@ -664,17 +658,15 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
while (const auto block = stream->read())
{
const auto new_keys = createAttributesFromBlock(block, 0, { AttributeUnderlyingType::utUInt64 }).front();
const auto new_attributes = createAttributesFromBlock(
block, 1, ext::map<std::vector>(dictionary.getAttributes(), [](const auto & attribute) { return attribute.type; }));
const auto new_attributes = createAttributesFromBlock(block, 1, attributes_structure);
const auto & ids = std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values);
std::vector<CachePartition::Metadata> metadata(ids.size());
const auto & dict_lifetime = dictionary.getLifetime();
for (const auto i : ext::range(0, ids.size()))
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata[i].setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
/// mark corresponding id as found
on_updated(ids[i], i, new_attributes);
@ -699,8 +691,8 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
last_update_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, update_error_count));
tryLogException(last_update_exception, log, "Could not update cache dictionary '" + dictionary.getName() +
"', next update is scheduled at " + ext::to_string(backoff_end_time));
tryLogException(last_update_exception, log,
"Could not update ssd cache dictionary, next update is scheduled at " + ext::to_string(backoff_end_time));
}
}
@ -713,14 +705,14 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
CachePartition::Attributes new_attributes;
{
/// TODO: create attributes from structure
for (const auto & attribute : dictionary.getAttributes())
for (const auto & attribute_type : attributes_structure)
{
switch (attribute.type)
switch (attribute_type)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
new_attributes.emplace_back(); \
new_attributes.back().type = attribute.type; \
new_attributes.back().type = attribute_type; \
new_attributes.back().values = std::vector<TYPE>(); \
break;
@ -748,7 +740,6 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
std::vector<CachePartition::Metadata> metadata;
const auto & dict_lifetime = dictionary.getLifetime();
for (const auto & id_found_pair : remaining_ids)
{
@ -774,24 +765,23 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
// Set key
std::get<std::vector<UInt64>>(new_keys.values).push_back(id);
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata.emplace_back();
metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
metadata.back().setDefault();
/// Set null_value for each attribute
const auto & attributes = dictionary.getAttributes();
for (size_t i = 0; i < attributes.size(); ++i)
for (size_t i = 0; i < attributes_structure.size(); ++i)
{
const auto & attribute = attributes[i];
const auto & attribute = attributes_structure[i];
// append null
switch (attribute.type)
switch (attribute)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
auto & to_values = std::get<std::vector<TYPE>>(new_attributes[i].values); \
auto & null_value = std::get<TYPE>(attribute.null_value); \
auto & null_value = std::get<TYPE>(null_values[i]); \
to_values.push_back(null_value); \
} \
break;
@ -891,7 +881,8 @@ SSDCacheDictionary::SSDCacheDictionary(
, dict_lifetime(dict_lifetime_)
, path(path_)
, partition_max_size(partition_max_size_)
, storage(*this, path, 1, partition_max_size)
, storage(ext::map<std::vector>(dict_struct.attributes, [](const auto & attribute) { return attribute.underlying_type; }),
path, 1, partition_max_size)
, log(&Poco::Logger::get("SSDCacheDictionary"))
{
if (!this->source_ptr->supportsSelectiveLoad())
@ -906,7 +897,7 @@ SSDCacheDictionary::SSDCacheDictionary(
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
const auto null_value = std::get<TYPE>(attributes[index].null_value); \
const auto null_value = std::get<TYPE>(null_values[index]); \
getItemsNumberImpl<TYPE, TYPE>( \
index, \
ids, \
@ -1017,15 +1008,17 @@ void SSDCacheDictionary::getItemsNumberImpl(
{
for (const size_t row : not_found_ids[id])
out[row] = get_default(row);
});
},
getLifetime(),
null_values);
}
void SSDCacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
{
const auto index = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
const auto null_value = StringRef{std::get<String>(attributes[index].null_value)};
const auto null_value = StringRef{std::get<String>(null_values[index])};
getItemsString(index, ids, out, [&](const size_t) { return null_value; });
}
@ -1034,7 +1027,7 @@ void SSDCacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
{
const auto index = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
getItemsString(index, ids, out, [&](const size_t row) { return def->getDataAt(row); });
}
@ -1043,7 +1036,7 @@ void SSDCacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
{
const auto index = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
getItemsString(index, ids, out, [&](const size_t) { return StringRef{def}; });
}
@ -1081,7 +1074,9 @@ void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UIn
{
for (const size_t row : not_found_ids[id])
out[row] = false;
});
},
getLifetime(),
null_values);
}
size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) const
@ -1092,48 +1087,31 @@ size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) cons
return it->second;
}
SSDCacheDictionary::Attribute & SSDCacheDictionary::getAttribute(const std::string & attr_name)
{
return attributes[getAttributeIndex(attr_name)];
}
const SSDCacheDictionary::Attribute & SSDCacheDictionary::getAttribute(const std::string & attr_name) const
{
return attributes[getAttributeIndex(attr_name)];
}
const SSDCacheDictionary::Attributes & SSDCacheDictionary::getAttributes() const
{
return attributes;
}
template <typename T>
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithTypeImpl(const AttributeUnderlyingType type, const Field & null_value)
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithTypeImpl(const Field & null_value)
{
Attribute attr{type, {}};
attr.null_value = static_cast<T>(null_value.get<NearestFieldType<T>>());
AttributeValueVariant var_null_value = static_cast<T>(null_value.get<NearestFieldType<T>>());
bytes_allocated += sizeof(T);
return attr;
return var_null_value;
}
template <>
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithTypeImpl<String>(const AttributeUnderlyingType type, const Field & null_value)
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithTypeImpl<String>(const Field & null_value)
{
Attribute attr{type, {}};
attr.null_value = null_value.get<String>();
AttributeValueVariant var_null_value = null_value.get<String>();
bytes_allocated += sizeof(StringRef);
//if (!string_arena)
// string_arena = std::make_unique<ArenaWithFreeLists>();
return attr;
return var_null_value;
}
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value)
{
switch (type)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
return createAttributeWithTypeImpl<TYPE>(type, null_value);
return createAttributeNullValueWithTypeImpl<TYPE>(null_value);
DISPATCH(UInt8)
DISPATCH(UInt16)
@ -1157,13 +1135,13 @@ case AttributeUnderlyingType::ut##TYPE: \
void SSDCacheDictionary::createAttributes()
{
attributes.reserve(dict_struct.attributes.size());
null_values.reserve(dict_struct.attributes.size());
for (size_t i = 0; i < dict_struct.attributes.size(); ++i)
{
const auto & attribute = dict_struct.attributes[i];
attribute_index_by_name.emplace(attribute.name, i);
attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));
null_values.push_back(createAttributeNullValueWithType(attribute.underlying_type, attribute.null_value));
if (attribute.hierarchical)
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),

View File

@ -44,6 +44,23 @@ public:
ssize_t readString(const String & str, WriteBuffer & buffer);
};*/
using AttributeValueVariant = std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>;
class CachePartition
{
public:
@ -186,9 +203,10 @@ using CachePartitionPtr = std::unique_ptr<CachePartition>;
class CacheStorage
{
public:
using Attributes = std::vector<AttributeUnderlyingType>;
using Key = IDictionary::Key;
CacheStorage(SSDCacheDictionary & dictionary_, const std::string & path_,
CacheStorage(const Attributes & attributes_structure_, const std::string & path_,
const size_t partitions_count_, const size_t partition_max_size_);
template <typename T>
@ -213,7 +231,8 @@ public:
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found);
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values);
//BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const;
@ -225,10 +244,8 @@ private:
CachePartition::Attributes createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure);
SSDCacheDictionary & dictionary;
const Attributes attributes_structure;
// Block structure: Key, (Default + TTL), Attr1, Attr2, ...
// const Block header;
const std::string path;
const size_t partition_max_size;
std::vector<CachePartitionPtr> partitions;
@ -244,6 +261,8 @@ private:
mutable std::chrono::system_clock::time_point backoff_end_time;
// stats
mutable size_t bytes_allocated = 0;
mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
@ -380,39 +399,18 @@ public:
return nullptr;
}
private:
size_t getAttributeIndex(const std::string & attr_name) const;
struct Attribute
{
AttributeUnderlyingType type;
std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String> null_value;
AttributeValueVariant null_value;
};
using Attributes = std::vector<Attribute>;
/// переместить
const Attributes & getAttributes() const;
private:
size_t getAttributeIndex(const std::string & attr_name) const;
Attribute & getAttribute(const std::string & attr_name);
const Attribute & getAttribute(const std::string & attr_name) const;
template <typename T>
Attribute createAttributeWithTypeImpl(const AttributeUnderlyingType type, const Field & null_value);
Attribute createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value);
AttributeValueVariant createAttributeNullValueWithTypeImpl(const Field & null_value);
AttributeValueVariant createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value);
void createAttributes();
template <typename AttributeType, typename OutputType, typename DefaultGetter>
@ -429,11 +427,11 @@ private:
const std::string path;
const size_t partition_max_size;
mutable CacheStorage storage;
Logger * const log;
std::map<std::string, size_t> attribute_index_by_name;
Attributes attributes; // TODO: move to storage
std::vector<AttributeValueVariant> null_values;
mutable CacheStorage storage;
Logger * const log;
mutable size_t bytes_allocated = 0;
mutable std::atomic<size_t> element_count{0};