This commit is contained in:
Nikita Vasilev 2020-05-09 14:36:23 +03:00
parent 8e3442bae4
commit 270268f4a2
2 changed files with 54 additions and 16 deletions

View File

@ -268,6 +268,10 @@ size_t SSDComplexKeyCachePartition::append(
{
init_write_buffer();
}
if (!keys_buffer_pool)
{
keys_buffer_pool.emplace();
}
bool flushed = false;
auto finish_block = [&]()
@ -360,7 +364,7 @@ size_t SSDComplexKeyCachePartition::append(
if (!flushed)
{
key_to_index.set(keys[index], cache_index);
keys_buffer.push_back(keys[index]);
keys_buffer.push_back(keys_buffer_pool->copyKeyFrom(keys[index]));
++index;
++keys_in_block;
}
@ -442,6 +446,7 @@ void SSDComplexKeyCachePartition::flush()
for (size_t row = 0; row < keys_buffer.size(); ++row)
{
Index index;
Poco::Logger::get("get:").information("sz = " + std::to_string(keys_buffer[row].size()));
if (key_to_index.get(keys_buffer[row], index))
{
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
@ -451,6 +456,7 @@ void SSDComplexKeyCachePartition::flush()
}
key_to_index.set(keys_buffer[row], index);
}
Poco::Logger::get("get:").information("finish");
}
current_file_block_id += write_buffer_size;
@ -458,6 +464,8 @@ void SSDComplexKeyCachePartition::flush()
/// clear buffer
keys_buffer.clear();
keys_buffer_pool.reset();
keys_buffer_pool.emplace();
}
template <typename Out, typename GetDefault>
@ -754,7 +762,6 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
TemporalComplexKeysPool tmp_keys_pool;
KeyRefs keys;
keys.reserve(write_buffer_size);
// TODO: писать кол-во значений
for (size_t i = 0; i < write_buffer_size; ++i)
@ -777,6 +784,8 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
{
keys.emplace_back();
tmp_keys_pool.readKey(keys.back(), read_buffer);
Poco::Logger::get("ClearOldestBlocks").information("ktest: sz=" + std::to_string(keys.back().size())
+ " data=" + std::to_string(reinterpret_cast<size_t>(keys.back().fullData())));
Metadata metadata;
readBinary(metadata.data, read_buffer);
@ -827,13 +836,18 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
Poco::Logger::get("ClearOldestBlocks").information("> erasing keys <");
for (const auto& key : keys)
{
Poco::Logger::get("ClearOldestBlocks").information("ktest: null=" + std::to_string(key.isNull()));
Poco::Logger::get("ClearOldestBlocks").information("ktest: data=" + std::to_string(reinterpret_cast<size_t>(key.fullData())));
Poco::Logger::get("ClearOldestBlocks").information("ktest: sz=" + std::to_string(key.size()) + " fz=" + std::to_string(key.fullSize()));
Index index;
if (key_to_index.get(key, index))
{
Poco::Logger::get("ClearOldestBlocks").information("erase");
size_t block_id = index.getBlockId();
if (start_block <= block_id && block_id < finish_block)
key_to_index.erase(key);
}
Poco::Logger::get("ClearOldestBlocks").information("finish");
}
}
@ -1038,6 +1052,7 @@ void SSDComplexKeyCacheStorage::update(
const DataTypes & key_types,
const KeyRefs & required_keys,
const std::vector<size_t> & required_rows,
TemporalComplexKeysPool & tmp_keys_pool,
PresentIdHandler && on_updated,
AbsentIdHandler && on_key_not_found,
const DictionaryLifetime lifetime)
@ -1056,9 +1071,9 @@ void SSDComplexKeyCacheStorage::update(
if (inserted < metadata.size())
{
partitions.emplace_front(std::make_unique<SSDComplexKeyCachePartition>(
AttributeUnderlyingType::utUInt64, attributes_structure, path,
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
partition_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys));
AttributeUnderlyingType::utUInt64, attributes_structure, path,
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
partition_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys));
}
}
@ -1077,7 +1092,6 @@ void SSDComplexKeyCacheStorage::update(
{
const auto keys_size = key_columns.size();
StringRefs keys(keys_size);
TemporalComplexKeysPool tmp_keys_pool;
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
@ -1111,7 +1125,7 @@ void SSDComplexKeyCacheStorage::update(
for (const auto i : ext::range(0, rows_num))
{
auto key = tmp_keys_pool.allocKey(i, new_key_columns, keys);
SCOPE_EXIT(tmp_keys_pool.rollback(key));
//SCOPE_EXIT(tmp_keys_pool.rollback(key));
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata[i].setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
@ -1475,12 +1489,14 @@ void SSDComplexKeyCacheDictionary::getItemsNumberImpl(
for (const auto & key_ref : required_keys)
required_rows.push_back(not_found_keys[key_ref].front());
TemporalComplexKeysPool tmp_keys_pool;
storage.update(
source_ptr,
key_columns,
key_types,
required_keys,
required_rows,
tmp_keys_pool,
[&](const auto key, const auto row, const auto & new_attributes)
{
for (const size_t out_row : not_found_keys[key])
@ -1580,12 +1596,14 @@ void SSDComplexKeyCacheDictionary::getItemsStringImpl(
for (const auto & key_ref : required_keys)
required_rows.push_back(not_found_keys[key_ref].front());
TemporalComplexKeysPool tmp_keys_pool;
storage.update(
source_ptr,
key_columns,
key_types,
required_keys,
required_rows,
tmp_keys_pool,
[&](const auto key, const auto row, const auto & new_attributes)
{
update_result[key] = std::get<SSDComplexKeyCachePartition::Attribute::Container<String>>(new_attributes[attribute_index].values)[row];
@ -1593,7 +1611,6 @@ void SSDComplexKeyCacheDictionary::getItemsStringImpl(
[&](const auto) {},
getLifetime());
TemporalComplexKeysPool tmp_keys_pool;
StringRefs tmp_refs(key_columns.size());
size_t default_index = 0;
for (size_t row = 0; row < n; ++row)
@ -1643,12 +1660,14 @@ void SSDComplexKeyCacheDictionary::has(
for (const auto & key_ref : required_keys)
required_rows.push_back(not_found_keys[key_ref].front());
TemporalComplexKeysPool tmp_keys_pool;
storage.update(
source_ptr,
key_columns,
key_types,
required_keys,
required_rows,
tmp_keys_pool,
[&](const auto key, const auto, const auto &)
{
for (const size_t out_row : not_found_keys[key])

View File

@ -9,7 +9,6 @@
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#include <Common/SmallObjectPool.h>
@ -36,13 +35,20 @@ public:
KeyRef() : ptr(nullptr) {}
inline UInt16 size() const {
return *reinterpret_cast<const UInt16 *>(ptr);
UInt16 sz;
memcpy(&sz, ptr, sizeof(sz));
return sz;
//return *reinterpret_cast<const UInt16 *>(ptr);
}
inline size_t fullSize() const {
return static_cast<size_t>(size()) + sizeof(UInt16);
}
inline bool isNull() const {
return ptr == nullptr;
}
inline char * data() const {
return ptr + sizeof(UInt16);
}
@ -114,6 +120,7 @@ class ComplexKeysPoolImpl
public:
KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys)
{
std::lock_guard lock(m);
if constexpr (std::is_same_v<A, SmallObjectPool>)
{
// not working now
@ -151,7 +158,7 @@ public:
{
if (!key_columns[j]->valuesHaveFixedSize()) // String
{
auto start = key_start;
//auto start = key_start;
auto key_size = keys[j].size + 1;
memcpy(key_start, &key_size, sizeof(size_t));
key_start += sizeof(size_t);
@ -159,13 +166,13 @@ public:
key_start += keys[j].size;
*key_start = '\0';
++key_start;
keys[j].data = start;
keys[j].size += sizeof(size_t) + 1;
//keys[j].data = start;
//keys[j].size += sizeof(size_t) + 1;
}
else
{
memcpy(key_start, keys[j].data, keys[j].size);
keys[j].data = key_start;
//keys[j].data = key_start;
key_start += keys[j].size;
}
}
@ -176,13 +183,17 @@ public:
KeyRef copyKeyFrom(const KeyRef & key)
{
std::lock_guard lock(m);
//Poco::Logger::get("test cpy").information("--- --- --- ");
char * data = arena.alloc(key.fullSize());
//Poco::Logger::get("test cpy").information("--- --- --- finish");
memcpy(data, key.fullData(), key.fullSize());
return KeyRef(data);
}
void freeKey(const KeyRef & key)
{
std::lock_guard lock(m);
if constexpr (std::is_same_v<A, ArenaWithFreeLists>)
arena.free(key.fullData(), key.fullSize());
else if constexpr (std::is_same_v<A, SmallObjectPool>)
@ -193,6 +204,7 @@ public:
void rollback(const KeyRef & key)
{
std::lock_guard lock(m);
if constexpr (std::is_same_v<A, Arena>)
arena.rollback(key.fullSize());
else
@ -206,8 +218,10 @@ public:
void readKey(KeyRef & key, ReadBuffer & buf)
{
std::lock_guard lock(m);
UInt16 sz;
readBinary(sz, buf);
Poco::Logger::get("test read key").information("sz " + std::to_string(sz));
char * data = nullptr;
if constexpr (std::is_same_v<A, SmallObjectPool>)
data = arena.alloc();
@ -216,6 +230,7 @@ public:
memcpy(data, &sz, sizeof(sz));
buf.read(data + sizeof(sz), sz);
key = KeyRef(data);
Poco::Logger::get("test read key").information("ksz = " + std::to_string(key.size()));
}
void ignoreKey(ReadBuffer & buf) const
@ -226,6 +241,7 @@ public:
}
private:
std::mutex m;
A arena;
};
@ -270,7 +286,7 @@ public:
else
{
queue.erase(it->second.iter);
it->second.iter = queue.insert(std::end(queue), key);
it->second.iter = queue.insert(std::end(queue), it->first);
it->second.val = val;
}
}
@ -294,7 +310,7 @@ public:
if (it == std::end(cache))
return false;
keys_pool.freeKey(key);
keys_pool.freeKey(it->first);
queue.erase(it->second.iter);
cache.erase(it);
return true;
@ -492,6 +508,8 @@ private:
ComplexKeysPool keys_pool;
mutable ComplexKeyLRUCache<KeyRef, Index, ComplexKeysPool> key_to_index;
std::optional<TemporalComplexKeysPool> keys_buffer_pool;
KeyRefs keys_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;
@ -547,6 +565,7 @@ public:
void update(DictionarySourcePtr & source_ptr,
const Columns & key_columns, const DataTypes & key_types,
const KeyRefs & required_keys, const std::vector<size_t> & required_rows,
TemporalComplexKeysPool & tmp_keys_pool,
PresentIdHandler && on_updated, AbsentIdHandler && on_key_not_found,
const DictionaryLifetime lifetime);