get rid of metadata in ram

This commit is contained in:
Nikita Vasilev 2020-04-22 21:00:08 +03:00
parent b01ea01e87
commit 04347a94d4
2 changed files with 21 additions and 70 deletions

View File

@ -181,7 +181,7 @@ CachePartition::CachePartition(
, write_buffer_size(write_buffer_size_)
, max_stored_keys(max_stored_keys_)
, path(dir_path + "/" + std::to_string(file_id))
, key_to_index_and_metadata(max_stored_keys)
, key_to_index(max_stored_keys)
, attributes_structure(attributes_structure_)
{
keys_buffer.type = AttributeUnderlyingType::utUInt64;
@ -218,17 +218,6 @@ CachePartition::~CachePartition()
size_t CachePartition::appendDefaults(
const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin)
{
/*std::unique_lock lock(rw_lock);
const auto & ids = std::get<Attribute::Container<UInt64>>(new_keys.values);
for (size_t index = begin; index < ids.size(); ++index)
{
IndexAndMetadata index_and_metadata;
index_and_metadata.metadata = metadata[index];
index_and_metadata.metadata.setDefault();
key_to_index_and_metadata.set(ids[index], index_and_metadata);
}
*/
return appendBlock(new_keys, Attributes{}, metadata, begin);
}
@ -257,8 +246,6 @@ size_t CachePartition::appendBlock(
{
init_write_buffer();
//codec = CompressionCodecFactory::instance().get("NONE", std::nullopt);
//compressed_buffer.emplace(*write_buffer, codec);
// hashing_buffer.emplace(*compressed_buffer);
}
bool flushed = false;
@ -274,11 +261,10 @@ size_t CachePartition::appendBlock(
for (size_t index = begin; index < ids.size();)
{
IndexAndMetadata index_and_metadata;
index_and_metadata.index.setInMemory(true);
index_and_metadata.index.setBlockId(current_memory_block_id);
index_and_metadata.index.setAddressInBlock(write_buffer->offset());
index_and_metadata.metadata = metadata[index];
Index cache_index;
cache_index.setInMemory(true);
cache_index.setBlockId(current_memory_block_id);
cache_index.setAddressInBlock(write_buffer->offset());
flushed = false;
if (2 * sizeof(UInt64) > write_buffer->available()) // place for key and metadata
@ -331,7 +317,6 @@ size_t CachePartition::appendBlock(
case AttributeUnderlyingType::utString:
{
//LOG_DEBUG(&Poco::Logger::get("kek"), "string write");
const auto & value = std::get<Attribute::Container<String>>(attribute.values)[index];
if (sizeof(UInt64) + value.size() > write_buffer->available())
{
@ -349,7 +334,7 @@ size_t CachePartition::appendBlock(
if (!flushed)
{
key_to_index_and_metadata.set(ids[index], index_and_metadata);
key_to_index.set(ids[index], cache_index);
ids_buffer.push_back(ids[index]);
++index;
++keys_in_block;
@ -432,17 +417,14 @@ void CachePartition::flush()
/// commit changes in index
for (size_t row = 0; row < ids.size(); ++row)
{
IndexAndMetadata index_and_metadata;
if (key_to_index_and_metadata.get(ids[row], index_and_metadata)) {
auto & index = index_and_metadata.index;
Index index;
if (key_to_index.get(ids[row], index)) {
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
{
index.setInMemory(false);
index.setBlockId(current_file_block_id + index.getBlockId());
}
key_to_index_and_metadata.set(ids[row], index_and_metadata);
} else {
// Key was evicted from cache.
key_to_index.set(ids[row], index);
}
}
@ -475,11 +457,6 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
}
};
/*auto set_default = [&](const size_t index)
{
out[index] = get_default(index);
};*/
getImpl(ids, set_value, found);
}
@ -509,12 +486,6 @@ void CachePartition::getString(const size_t attribute_index, const PaddedPODArra
}
};
/*auto set_default = [&](const size_t index)
{
buf.ignore(sizeof(UInt64)); // key
default_ids.push_back(index);
};*/
getImpl(ids, set_value, found);
}
@ -543,25 +514,13 @@ void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set,
PaddedPODArray<Index> indices(ids.size());
for (size_t i = 0; i < ids.size(); ++i)
{
IndexAndMetadata index_and_metadata;
Index index;
if (found[i])
{
indices[i].setNotExists();
}
else if (key_to_index_and_metadata.get(ids[i], index_and_metadata)/* && index_and_metadata.metadata.expiresAt() > now*/)
{
/*if (unlikely(index_and_metadata.metadata.isDefault()))
{
indices[i].setNotExists();
//set_default(i);
}
else*/
indices[i] = index_and_metadata.index;
}
else if (key_to_index.get(ids[i], index))
indices[i] = index;
else
{
indices[i].setNotExists();
}
}
getValueFromMemory(indices, set);
@ -825,11 +784,11 @@ void CachePartition::clearOldestBlocks()
const size_t finish_block = start_block + block_size * write_buffer_size;
for (const auto& key : keys)
{
IndexAndMetadata index_and_metadata;
if (key_to_index_and_metadata.get(key, index_and_metadata)) {
size_t block_id = index_and_metadata.index.getBlockId();
Index index;
if (key_to_index.get(key, index)) {
size_t block_id = index.getBlockId();
if (start_block <= block_id && block_id < finish_block) {
key_to_index_and_metadata.erase(key);
key_to_index.erase(key);
}
}
}
@ -888,7 +847,7 @@ double CachePartition::getLoadFactor() const
size_t CachePartition::getElementCount() const
{
std::shared_lock lock(rw_lock);
return key_to_index_and_metadata.size();
return key_to_index.size();
}
PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chrono::system_clock::time_point /* now */) const
@ -896,16 +855,14 @@ PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chro
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
PaddedPODArray<Key> array;
for (const auto & [key, index_and_metadata] : key_to_index_and_metadata)
if (!index_and_metadata.second.metadata.isDefault() /* && index_and_metadata.second.metadata.expiresAt() > now */)
array.push_back(key);
for (const auto & [key, index] : key_to_index)
array.push_back(key); // TODO: exclude default
return array;
}
void CachePartition::remove()
{
std::unique_lock lock(rw_lock);
//Poco::File(path + BIN_FILE_EXT).remove();
std::filesystem::remove(std::filesystem::path(path + BIN_FILE_EXT));
}

View File

@ -100,6 +100,7 @@ using AttributeValueVariant = std::variant<
Float64,
String>;
class CachePartition
{
public:
@ -239,16 +240,9 @@ private:
int fd = -1;
struct IndexAndMetadata final
{
Index index{};
Metadata metadata{};
};
mutable CLRUCache<UInt64, IndexAndMetadata> key_to_index_and_metadata;
mutable CLRUCache<UInt64, Index> key_to_index;
Attribute keys_buffer;
//std::vector<Metadata> metadata_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;
std::optional<Memory<>> memory;