has and ttl

This commit is contained in:
Nikita Vasilev 2020-01-08 15:40:29 +03:00
parent 297b8aa7ab
commit 05622f2bee
3 changed files with 132 additions and 52 deletions

View File

@ -82,20 +82,20 @@ namespace
const std::string IND_FILE_EXT = ".idx";
}
CachePartition::KeyMetadata::time_point_t CachePartition::KeyMetadata::expiresAt() const
CachePartition::Metadata::time_point_t CachePartition::Metadata::expiresAt() const
{
return ext::safe_bit_cast<time_point_t>(data & KEY_METADATA_EXPIRES_AT_MASK);
}
void CachePartition::KeyMetadata::setExpiresAt(const time_point_t & t)
void CachePartition::Metadata::setExpiresAt(const time_point_t & t)
{
data = ext::safe_bit_cast<time_point_urep_t>(t);
}
bool CachePartition::KeyMetadata::isDefault() const
bool CachePartition::Metadata::isDefault() const
{
return (data & KEY_METADATA_IS_DEFAULT_MASK) == KEY_METADATA_IS_DEFAULT_MASK;
}
void CachePartition::KeyMetadata::setDefault()
void CachePartition::Metadata::setDefault()
{
data |= KEY_METADATA_IS_DEFAULT_MASK;
}
@ -198,7 +198,7 @@ CachePartition::~CachePartition()
::close(fd);
}
void CachePartition::appendBlock(const Attribute & new_keys, const Attributes & new_attributes)
void CachePartition::appendBlock(const Attribute & new_keys, const Attributes & new_attributes, const std::vector<Metadata> & metadata)
{
if (new_attributes.size() != attributes_buffer.size())
throw Exception{"Wrong columns number in block.", ErrorCodes::BAD_ARGUMENTS};
@ -206,17 +206,16 @@ void CachePartition::appendBlock(const Attribute & new_keys, const Attributes &
const auto & ids = std::get<Attribute::Container<UInt64>>(new_keys.values);
auto & ids_buffer = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
//appendValuesToAttribute(keys_buffer, new_keys);
if (!write_buffer)
write_buffer.emplace(memory.data(), SSD_BLOCK_SIZE);
for (size_t index = 0; index < ids.size();)
{
auto & key_index = key_to_metadata[ids[index]].index;
key_index.setInMemory(true);
key_index.setBlockId(current_memory_block_id);
key_index.setAddressInBlock(write_buffer->offset());
auto & index_and_metadata = key_to_index_and_metadata[ids[index]];
index_and_metadata.index.setInMemory(true);
index_and_metadata.index.setBlockId(current_memory_block_id);
index_and_metadata.index.setAddressInBlock(write_buffer->offset());
index_and_metadata.metadata = metadata[index];
bool flushed = false;
@ -332,7 +331,7 @@ void CachePartition::flush()
write_request.aio.aio_fildes = fd;
write_request.aio.aio_buf = reinterpret_cast<volatile void *>(memory.data());
write_request.aio.aio_nbytes = DEFAULT_AIO_FILE_BLOCK_SIZE;
write_request.aio.aio_offset = DEFAULT_AIO_FILE_BLOCK_SIZE;
write_request.aio.aio_offset = DEFAULT_AIO_FILE_BLOCK_SIZE * current_file_block_id;
#else
write_request.aio_lio_opcode = IOCB_CMD_PWRITE;
write_request.aio_fildes = fd;
@ -380,9 +379,9 @@ void CachePartition::flush()
/// commit changes in index
for (size_t row = 0; row < ids.size(); ++row)
{
key_to_metadata[ids[row]].index.setInMemory(false);
key_to_metadata[ids[row]].index.setBlockId(current_file_block_id);
Poco::Logger::get("INDEX:").information("NEW MAP: " + std::to_string(ids[row]) + " -> " + std::to_string(key_to_metadata[ids[row]].index.index));
key_to_index_and_metadata[ids[row]].index.setInMemory(false);
key_to_index_and_metadata[ids[row]].index.setBlockId(current_file_block_id);
Poco::Logger::get("INDEX:").information("NEW MAP: " + std::to_string(ids[row]) + " -> " + std::to_string(key_to_index_and_metadata[ids[row]].index.index));
}
++current_file_block_id;
@ -395,23 +394,28 @@ void CachePartition::flush()
template <typename Out, typename Key>
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found) const
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
{
Poco::Logger::get("IDS:").information(std::to_string(ids.size()));
PaddedPODArray<Index> indices(ids.size());
for (size_t i = 0; i < ids.size(); ++i)
{
auto it = key_to_metadata.find(ids[i]);
if (it == std::end(key_to_metadata)) // TODO: check expired
auto it = key_to_index_and_metadata.find(ids[i]);
if (it == std::end(key_to_index_and_metadata))
{
indices[i].setNotExists();
not_found[ids[i]].push_back(i);
Poco::Logger::get("part:").information("NOT FOUND " + std::to_string(ids[i]) + " " + std::to_string(indices[i].index));
}
else if (it->second.metadata.expiresAt() <= now)
{
indices[i].setNotExists();
not_found[ids[i]].push_back(i);
markExpired(it);
}
else
{
indices[i] = it->second.index;
Poco::Logger::get("part:").information("HIT " + std::to_string(ids[i]) + " " + std::to_string(indices[i].index));
}
}
@ -482,7 +486,7 @@ void CachePartition::getValueFromStorage(
reinterpret_cast<UInt64>(read_buffer.data()) + SSD_BLOCK_SIZE * (i % MAX_BLOCKS_TO_KEEP_IN_MEMORY));
request.aio.aio_nbytes = SSD_BLOCK_SIZE;
request.aio.aio_offset = index_to_out[i].first;
request.aio_data = i;
request.aio_data = requests.size();
#else
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
@ -619,22 +623,38 @@ void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst
}
}
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out) const
template <typename Key>
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const
{
for (size_t i = 0; i < ids.size(); ++i)
{
auto it = key_to_metadata.find(ids[i]);
if (it == std::end(key_to_metadata))
auto it = key_to_index_and_metadata.find(ids[i]);
if (it == std::end(key_to_index_and_metadata))
{
out[i] = 0;
not_found[ids[i]].push_back(i);
}
else if (it->second.metadata.expiresAt() <= now)
{
not_found[ids[i]].push_back(i);
markExpired(it);
}
else
{
out[i] = it->second.isDefault();
Poco::Logger::get("not expired").information("expires at " + std::to_string(std::chrono::system_clock::to_time_t(it->second.metadata.expiresAt())) + " now: " + std::to_string(std::chrono::system_clock::to_time_t(now)));
out[i] = !it->second.metadata.isDefault();
}
}
}
template <typename Iterator>
void CachePartition::markExpired(const Iterator & it) const
{
Poco::Logger::get("markExpired").information("expired: " + std::to_string(it->first));
key_to_index_and_metadata.erase(it);
}
CacheStorage::CacheStorage(SSDCacheDictionary & dictionary_, const std::string & path_, const size_t partitions_count_, const size_t partition_max_size_)
: dictionary(dictionary_)
, path(path_)
@ -689,15 +709,20 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
const auto & ids = std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values);
std::vector<CachePartition::Metadata> metadata(ids.size());
const auto & dict_lifetime = dictionary.getLifetime();
for (const auto i : ext::range(0, ids.size()))
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
metadata[i].setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
/// mark corresponding id as found
on_updated(ids[i], i, new_attributes);
remaining_ids[ids[i]] = 1;
}
/// TODO: Add TTL to block
partitions[0]->appendBlock(new_keys, new_attributes);
partitions[0]->appendBlock(new_keys, new_attributes, metadata);
}
stream->readSuffix();
@ -761,6 +786,10 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
}
}
std::vector<CachePartition::Metadata> metadata;
const auto & dict_lifetime = dictionary.getLifetime();
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
@ -785,6 +814,11 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
// Set key
std::get<std::vector<UInt64>>(new_keys.values).push_back(id);
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
metadata.emplace_back();
metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
metadata.back().setDefault();
/// Set null_value for each attribute
const auto & attributes = dictionary.getAttributes();
for (size_t i = 0; i < attributes.size(); ++i)
@ -827,8 +861,9 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
/// inform caller that the cell has not been found
on_id_not_found(id);
}
if (not_found_num)
partitions[0]->appendBlock(new_keys, new_attributes);
partitions[0]->appendBlock(new_keys, new_attributes, metadata);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
@ -1001,8 +1036,10 @@ template <typename AttributeType, typename OutputType, typename DefaultGetter>
void SSDCacheDictionary::getItemsNumberImpl(
const size_t attribute_index, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
{
const auto now = std::chrono::system_clock::now();
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids);
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids, now);
if (not_found_ids.empty())
return;
@ -1061,6 +1098,32 @@ void SSDCacheDictionary::getItemsString(const size_t attribute_index, const Padd
UNUSED(get_default);
}
void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
{
const auto now = std::chrono::system_clock::now();
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
storage.has(ids, out, not_found_ids, now);
if (not_found_ids.empty())
return;
std::vector<Key> required_ids(not_found_ids.size());
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](const auto & pair) { return pair.first; });
storage.update(
source_ptr,
required_ids,
[&](const auto id, const auto, const auto &) {
for (const size_t out_row : not_found_ids[id])
out[out_row] = true;
},
[&](const size_t id)
{
for (const size_t row : not_found_ids[id])
out[row] = false;
});
}
size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) const
{
auto it = attribute_index_by_name.find(attr_name);

View File

@ -47,6 +47,22 @@ public:
class CachePartition
{
public:
struct Metadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
time_point_t expiresAt() const;
void setExpiresAt(const time_point_t & t);
bool isDefault() const;
void setDefault();
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data = 0;
};
using Offset = size_t;
using Offsets = std::vector<Offset>;
@ -61,14 +77,14 @@ public:
template <typename Out, typename Key>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found) const;
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const;
// TODO:: getString
/// 0 -- not found
/// 1 -- good
/// 2 -- expired
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out) const;
template <typename Key>
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const;
struct Attribute
{
@ -96,7 +112,7 @@ public:
using Attributes = std::vector<Attribute>;
// Key, (Metadata), attributes
void appendBlock(const Attribute & new_keys, const Attributes & new_attributes);
void appendBlock(const Attribute & new_keys, const Attributes & new_attributes, const std::vector<Metadata> & metadata);
private:
struct Index final
@ -134,6 +150,9 @@ private:
template <typename Out>
void readValueFromBuffer(const size_t attribute_index, Out & dst, ReadBuffer & buf) const;
template <typename Iterator>
void markExpired(const Iterator & it) const;
size_t file_id;
size_t max_size;
//size_t buffer_size;
@ -142,24 +161,13 @@ private:
//mutable std::shared_mutex rw_lock;
int fd = -1;
struct KeyMetadata final
struct IndexAndMetadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
time_point_t expiresAt() const;
void setExpiresAt(const time_point_t & t);
bool isDefault() const;
void setDefault();
Index index{};
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data = 0;
Metadata metadata{};
};
std::unordered_map<UInt64, KeyMetadata> key_to_metadata;
mutable std::unordered_map<UInt64, IndexAndMetadata> key_to_index_and_metadata;
Attribute keys_buffer;
Attributes attributes_buffer;
@ -189,13 +197,21 @@ public:
template <typename Out>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found) const
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
{
partitions[0]->getValue<Out>(attribute_index, ids, out, not_found);
partitions[0]->getValue<Out>(attribute_index, ids, out, not_found, now);
}
// getString();
template <typename Key>
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const
{
partitions[0]->has(ids, out, not_found, now);
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found);
@ -356,7 +372,7 @@ public:
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const;
void has(const PaddedPODArray<Key> & /* ids */, PaddedPODArray<UInt8> & /* out */) const override {} // TODO
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override // TODO
{

View File

@ -137,6 +137,7 @@ private:
if (!executeDispatchSimple<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchSimple<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchSimple<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchSimple<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr))