complex_key

This commit is contained in:
Nikita Vasilev 2020-04-30 23:50:31 +03:00
parent c5f8ebd98c
commit fac0439efb
7 changed files with 2676 additions and 75 deletions

View File

@ -105,65 +105,65 @@ namespace
}
}
CachePartition::Metadata::time_point_t CachePartition::Metadata::expiresAt() const
SSDCachePartition::Metadata::time_point_t SSDCachePartition::Metadata::expiresAt() const
{
return ext::safe_bit_cast<time_point_t>(data & KEY_METADATA_EXPIRES_AT_MASK);
}
void CachePartition::Metadata::setExpiresAt(const time_point_t & t)
void SSDCachePartition::Metadata::setExpiresAt(const time_point_t & t)
{
data = ext::safe_bit_cast<time_point_urep_t>(t);
}
bool CachePartition::Metadata::isDefault() const
bool SSDCachePartition::Metadata::isDefault() const
{
return (data & KEY_METADATA_IS_DEFAULT_MASK) == KEY_METADATA_IS_DEFAULT_MASK;
}
void CachePartition::Metadata::setDefault()
void SSDCachePartition::Metadata::setDefault()
{
data |= KEY_METADATA_IS_DEFAULT_MASK;
}
bool CachePartition::Index::inMemory() const
bool SSDCachePartition::Index::inMemory() const
{
return (index & KEY_IN_MEMORY) == KEY_IN_MEMORY;
}
bool CachePartition::Index::exists() const
bool SSDCachePartition::Index::exists() const
{
return index != NOT_EXISTS;
}
void CachePartition::Index::setNotExists()
void SSDCachePartition::Index::setNotExists()
{
index = NOT_EXISTS;
}
void CachePartition::Index::setInMemory(const bool in_memory)
void SSDCachePartition::Index::setInMemory(const bool in_memory)
{
index = (index & ~KEY_IN_MEMORY) | (static_cast<size_t>(in_memory) << KEY_IN_MEMORY_BIT);
}
size_t CachePartition::Index::getAddressInBlock() const
size_t SSDCachePartition::Index::getAddressInBlock() const
{
return index & INDEX_IN_BLOCK_MASK;
}
void CachePartition::Index::setAddressInBlock(const size_t address_in_block)
void SSDCachePartition::Index::setAddressInBlock(const size_t address_in_block)
{
index = (index & ~INDEX_IN_BLOCK_MASK) | address_in_block;
}
size_t CachePartition::Index::getBlockId() const
size_t SSDCachePartition::Index::getBlockId() const
{
return (index & BLOCK_INDEX_MASK) >> INDEX_IN_BLOCK_BITS;
}
void CachePartition::Index::setBlockId(const size_t block_id)
void SSDCachePartition::Index::setBlockId(const size_t block_id)
{
index = (index & ~BLOCK_INDEX_MASK) | (block_id << INDEX_IN_BLOCK_BITS);
}
CachePartition::CachePartition(
SSDCachePartition::SSDCachePartition(
const AttributeUnderlyingType & /* key_structure */,
const std::vector<AttributeUnderlyingType> & attributes_structure_,
const std::string & dir_path,
@ -184,7 +184,7 @@ CachePartition::CachePartition(
, attributes_structure(attributes_structure_)
{
keys_buffer.type = AttributeUnderlyingType::utUInt64;
keys_buffer.values = CachePartition::Attribute::Container<UInt64>();
keys_buffer.values = SSDCachePartition::Attribute::Container<UInt64>();
std::filesystem::create_directories(std::filesystem::path{dir_path});
@ -206,19 +206,19 @@ CachePartition::CachePartition(
}
}
CachePartition::~CachePartition()
SSDCachePartition::~SSDCachePartition()
{
std::unique_lock lock(rw_lock);
::close(fd);
}
size_t CachePartition::appendDefaults(
size_t SSDCachePartition::appendDefaults(
const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin)
{
return appendBlock(new_keys, Attributes{}, metadata, begin);
}
size_t CachePartition::appendBlock(
size_t SSDCachePartition::appendBlock(
const Attribute & new_keys, const Attributes & new_attributes, const PaddedPODArray<Metadata> & metadata, const size_t begin)
{
std::unique_lock lock(rw_lock);
@ -346,7 +346,7 @@ size_t CachePartition::appendBlock(
return ids.size() - begin;
}
void CachePartition::flush()
void SSDCachePartition::flush()
{
if (current_file_block_id >= max_size)
clearOldestBlocks();
@ -435,7 +435,7 @@ void CachePartition::flush()
}
template <typename Out, typename GetDefault>
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
void SSDCachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const
{
@ -461,7 +461,7 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
getImpl(ids, set_value, found);
}
void CachePartition::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
void SSDCachePartition::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found, std::vector<size_t> & default_ids,
std::chrono::system_clock::time_point now) const
{
@ -492,7 +492,7 @@ void CachePartition::getString(const size_t attribute_index, const PaddedPODArra
getImpl(ids, set_value, found);
}
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
void SSDCachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::vector<bool> & found, std::chrono::system_clock::time_point now) const
{
auto set_value = [&](const size_t index, ReadBuffer & buf)
@ -509,7 +509,7 @@ void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UIn
}
template <typename SetFunc>
void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set,
void SSDCachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set,
std::vector<bool> & found) const
{
std::shared_lock lock(rw_lock);
@ -530,7 +530,7 @@ void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set,
}
template <typename SetFunc>
void CachePartition::getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const
void SSDCachePartition::getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const
{
// Do not check checksum while reading from memory.
for (size_t i = 0; i < indices.size(); ++i)
@ -547,7 +547,7 @@ void CachePartition::getValueFromMemory(const PaddedPODArray<Index> & indices, S
}
template <typename SetFunc>
void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc & set) const
void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc & set) const
{
std::vector<std::pair<Index, size_t>> index_to_out;
for (size_t i = 0; i < indices.size(); ++i)
@ -668,7 +668,7 @@ void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices,
}
}
void CachePartition::clearOldestBlocks()
void SSDCachePartition::clearOldestBlocks()
{
Poco::Logger::get("GC").information("GC clear -----------------");
// write_buffer_size, because we need to erase the whole buffer.
@ -799,9 +799,8 @@ void CachePartition::clearOldestBlocks()
}
}
void CachePartition::ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const
void SSDCachePartition::ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const
{
//buf.ignore(2 * sizeof(UInt64)); // key and metadata
for (size_t i = 0; i < attribute_index; ++i)
{
switch (attributes_structure[i])
@ -838,24 +837,24 @@ void CachePartition::ignoreFromBufferToAttributeIndex(const size_t attribute_ind
}
}
size_t CachePartition::getId() const
size_t SSDCachePartition::getId() const
{
return file_id;
}
double CachePartition::getLoadFactor() const
double SSDCachePartition::getLoadFactor() const
{
std::shared_lock lock(rw_lock);
return static_cast<double>(current_file_block_id) / max_size;
}
size_t CachePartition::getElementCount() const
size_t SSDCachePartition::getElementCount() const
{
std::shared_lock lock(rw_lock);
return key_to_index.size();
}
PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chrono::system_clock::time_point /* now */) const
PaddedPODArray<SSDCachePartition::Key> SSDCachePartition::getCachedIds(const std::chrono::system_clock::time_point /* now */) const
{
std::unique_lock lock(rw_lock); // Begin and end iterators can be changed.
PaddedPODArray<Key> array;
@ -864,13 +863,13 @@ PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chro
return array;
}
void CachePartition::remove()
void SSDCachePartition::remove()
{
std::unique_lock lock(rw_lock);
std::filesystem::remove(std::filesystem::path(path + BIN_FILE_EXT));
}
CacheStorage::CacheStorage(
SSDCacheStorage::SSDCacheStorage(
const AttributeTypes & attributes_structure_,
const std::string & path_,
const size_t max_partitions_count_,
@ -887,11 +886,11 @@ CacheStorage::CacheStorage(
, read_buffer_size(read_buffer_size_)
, write_buffer_size(write_buffer_size_)
, max_stored_keys(max_stored_keys_)
, log(&Poco::Logger::get("CacheStorage"))
, log(&Poco::Logger::get("SSDCacheStorage"))
{
}
CacheStorage::~CacheStorage()
SSDCacheStorage::~SSDCacheStorage()
{
std::unique_lock lock(rw_lock);
partition_delete_queue.splice(std::end(partition_delete_queue), partitions);
@ -899,7 +898,7 @@ CacheStorage::~CacheStorage()
}
template <typename Out, typename GetDefault>
void CacheStorage::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
void SSDCacheStorage::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
GetDefault & get_default, std::chrono::system_clock::time_point now) const
{
@ -919,7 +918,7 @@ void CacheStorage::getValue(const size_t attribute_index, const PaddedPODArray<U
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
}
void CacheStorage::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
void SSDCacheStorage::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const
{
@ -939,7 +938,7 @@ void CacheStorage::getString(const size_t attribute_index, const PaddedPODArray<
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
}
void CacheStorage::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
void SSDCacheStorage::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const
{
for (size_t i = 0; i < ids.size(); ++i)
@ -961,12 +960,12 @@ void CacheStorage::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
void SSDCacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime)
{
auto append_block = [this](const CachePartition::Attribute & new_keys,
const CachePartition::Attributes & new_attributes, const PaddedPODArray<CachePartition::Metadata> & metadata)
auto append_block = [this](const SSDCachePartition::Attribute & new_keys,
const SSDCachePartition::Attributes & new_attributes, const PaddedPODArray<SSDCachePartition::Metadata> & metadata)
{
size_t inserted = 0;
while (inserted < metadata.size())
@ -975,7 +974,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
inserted += partitions.front()->appendBlock(new_keys, new_attributes, metadata, inserted);
if (inserted < metadata.size())
{
partitions.emplace_front(std::make_unique<CachePartition>(
partitions.emplace_front(std::make_unique<SSDCachePartition>(
AttributeUnderlyingType::utUInt64, attributes_structure, path,
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
partition_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys));
@ -1017,9 +1016,9 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
const auto new_keys = std::move(createAttributesFromBlock(block, 0, { AttributeUnderlyingType::utUInt64 }).front());
const auto new_attributes = createAttributesFromBlock(block, 1, attributes_structure);
const auto & ids = std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values);
const auto & ids = std::get<SSDCachePartition::Attribute::Container<UInt64>>(new_keys.values);
PaddedPODArray<CachePartition::Metadata> metadata(ids.size());
PaddedPODArray<SSDCachePartition::Metadata> metadata(ids.size());
for (const auto i : ext::range(0, ids.size()))
{
@ -1053,7 +1052,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
}
auto append_defaults = [this](const CachePartition::Attribute & new_keys, const PaddedPODArray<CachePartition::Metadata> & metadata)
auto append_defaults = [this](const SSDCachePartition::Attribute & new_keys, const PaddedPODArray<SSDCachePartition::Metadata> & metadata)
{
size_t inserted = 0;
while (inserted < metadata.size())
@ -1062,7 +1061,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
inserted += partitions.front()->appendDefaults(new_keys, metadata, inserted);
if (inserted < metadata.size())
{
partitions.emplace_front(std::make_unique<CachePartition>(
partitions.emplace_front(std::make_unique<SSDCachePartition>(
AttributeUnderlyingType::utUInt64, attributes_structure, path,
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
partition_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys));
@ -1074,11 +1073,11 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
size_t not_found_num = 0, found_num = 0;
/// Check which ids have not been found and require setting null_value
CachePartition::Attribute new_keys;
SSDCachePartition::Attribute new_keys;
new_keys.type = AttributeUnderlyingType::utUInt64;
new_keys.values = CachePartition::Attribute::Container<UInt64>();
new_keys.values = SSDCachePartition::Attribute::Container<UInt64>();
PaddedPODArray<CachePartition::Metadata> metadata;
PaddedPODArray<SSDCachePartition::Metadata> metadata;
{
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
@ -1102,7 +1101,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
// Set key
std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values).push_back(id);
std::get<SSDCachePartition::Attribute::Container<UInt64>>(new_keys.values).push_back(id);
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata.emplace_back();
@ -1122,7 +1121,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
PaddedPODArray<CachePartition::Key> CacheStorage::getCachedIds() const
PaddedPODArray<SSDCachePartition::Key> SSDCacheStorage::getCachedIds() const
{
PaddedPODArray<Key> array;
@ -1138,7 +1137,7 @@ PaddedPODArray<CachePartition::Key> CacheStorage::getCachedIds() const
return array;
}
double CacheStorage::getLoadFactor() const
double SSDCacheStorage::getLoadFactor() const
{
double result = 0;
std::shared_lock lock(rw_lock);
@ -1147,7 +1146,7 @@ double CacheStorage::getLoadFactor() const
return result / partitions.size();
}
size_t CacheStorage::getElementCount() const
size_t SSDCacheStorage::getElementCount() const
{
size_t result = 0;
std::shared_lock lock(rw_lock);
@ -1156,7 +1155,7 @@ size_t CacheStorage::getElementCount() const
return result;
}
void CacheStorage::collectGarbage()
void SSDCacheStorage::collectGarbage()
{
// add partitions to queue
while (partitions.size() > max_partitions_count)
@ -1172,10 +1171,10 @@ void CacheStorage::collectGarbage()
}
}
CachePartition::Attributes CacheStorage::createAttributesFromBlock(
SSDCachePartition::Attributes SSDCacheStorage::createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure)
{
CachePartition::Attributes attributes;
SSDCachePartition::Attributes attributes;
const auto columns = block.getColumns();
for (size_t i = 0; i < structure.size(); ++i)
@ -1186,7 +1185,7 @@ CachePartition::Attributes CacheStorage::createAttributesFromBlock(
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
CachePartition::Attribute::Container<TYPE> values(column->size()); \
SSDCachePartition::Attribute::Container<TYPE> values(column->size()); \
memcpy(&values[0], column->getRawData().data, sizeof(TYPE) * values.size()); \
attributes.emplace_back(); \
attributes.back().type = structure[i]; \
@ -1213,7 +1212,7 @@ CachePartition::Attributes CacheStorage::createAttributesFromBlock(
case AttributeUnderlyingType::utString:
{
attributes.emplace_back();
CachePartition::Attribute::Container<String> values(column->size());
SSDCachePartition::Attribute::Container<String> values(column->size());
for (size_t j = 0; j < column->size(); ++j)
{
const auto ref = column->getDataAt(j);
@ -1376,7 +1375,7 @@ void SSDCacheDictionary::getItemsNumberImpl(
[&](const auto id, const auto row, const auto & new_attributes)
{
for (const size_t out_row : not_found_ids[id])
out[out_row] = std::get<CachePartition::Attribute::Container<OutputType>>(new_attributes[attribute_index].values)[row];
out[out_row] = std::get<SSDCachePartition::Attribute::Container<OutputType>>(new_attributes[attribute_index].values)[row];
},
[&](const size_t id)
{
@ -1455,7 +1454,7 @@ void SSDCacheDictionary::getItemsStringImpl(const size_t attribute_index, const
required_ids,
[&](const auto id, const auto row, const auto & new_attributes)
{
update_result[id] = std::get<CachePartition::Attribute::Container<String>>(new_attributes[attribute_index].values)[row];
update_result[id] = std::get<SSDCachePartition::Attribute::Container<String>>(new_attributes[attribute_index].values)[row];
},
[&](const size_t) {},
getLifetime());

View File

@ -127,7 +127,7 @@ using AttributeValueVariant = std::variant<
Float64,
String>;
class CachePartition
class SSDCachePartition
{
public:
struct Index final
@ -170,7 +170,7 @@ public:
using Offsets = std::vector<Offset>;
using Key = IDictionary::Key;
CachePartition(
SSDCachePartition(
const AttributeUnderlyingType & key_structure,
const std::vector<AttributeUnderlyingType> & attributes_structure,
const std::string & dir_path,
@ -181,7 +181,7 @@ public:
const size_t write_buffer_size,
const size_t max_stored_keys);
~CachePartition();
~SSDCachePartition();
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
@ -280,16 +280,16 @@ private:
size_t current_file_block_id = 0;
};
using CachePartitionPtr = std::shared_ptr<CachePartition>;
using SSDCachePartitionPtr = std::shared_ptr<SSDCachePartition>;
class CacheStorage
class SSDCacheStorage
{
public:
using AttributeTypes = std::vector<AttributeUnderlyingType>;
using Key = CachePartition::Key;
using Key = SSDCachePartition::Key;
CacheStorage(
SSDCacheStorage(
const AttributeTypes & attributes_structure,
const std::string & path,
const size_t max_partitions_count,
@ -299,10 +299,10 @@ public:
const size_t write_buffer_size,
const size_t max_stored_keys);
~CacheStorage();
~SSDCacheStorage();
template <typename T>
using ResultArrayType = CachePartition::ResultArrayType<T>;
using ResultArrayType = SSDCachePartition::ResultArrayType<T>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
@ -336,7 +336,7 @@ public:
double getLoadFactor() const;
private:
CachePartition::Attributes createAttributesFromBlock(
SSDCachePartition::Attributes createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure);
void collectGarbage();
@ -352,8 +352,8 @@ private:
const size_t max_stored_keys;
mutable std::shared_mutex rw_lock;
std::list<CachePartitionPtr> partitions;
std::list<CachePartitionPtr> partition_delete_queue;
std::list<SSDCachePartitionPtr> partitions;
std::list<SSDCachePartitionPtr> partition_delete_queue;
Logger * const log;
@ -432,7 +432,7 @@ public:
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>
using ResultArrayType = CacheStorage::ResultArrayType<T>;
using ResultArrayType = SSDCacheStorage::ResultArrayType<T>;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const;
@ -535,7 +535,7 @@ private:
std::map<std::string, size_t> attribute_index_by_name;
std::vector<AttributeValueVariant> null_values;
mutable CacheStorage storage;
mutable SSDCacheStorage storage;
Logger * const log;
mutable size_t bytes_allocated = 0;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,790 @@
#pragma once
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <atomic>
#include <chrono>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#include <Common/SmallObjectPool.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/Block.h>
#include <ext/scope_guard.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferAIO.h>
#include <list>
#include <pcg_random.hpp>
#include <shared_mutex>
#include <variant>
#include <vector>
namespace DB
{
class KeyRef
{
public:
explicit KeyRef(char * data) : ptr(data) {}
KeyRef() : ptr(nullptr) {}
inline UInt16 size() const {
return *reinterpret_cast<const UInt16 *>(ptr);
}
inline size_t fullSize() const {
return static_cast<size_t>(size()) + sizeof(UInt16);
}
inline char * data() const {
return ptr + sizeof(UInt16);
}
inline char * fullData() const {
return ptr;
}
inline char * fullData() {
return ptr;
}
inline const StringRef getRef() const {
return StringRef(data(), size());
}
inline bool operator==(const KeyRef & other) const {
return getRef() == other.getRef();
}
inline bool operator<(const KeyRef & other) const {
return getRef() < other.getRef();
}
private:
char * ptr;
};
using KeyRefs = std::vector<KeyRef>;
}
namespace std
{
template <>
struct hash<DB::KeyRef>
{
size_t operator() (DB::KeyRef key_ref) const
{
return hasher(key_ref.getRef());
}
std::hash<StringRef> hasher;
};
}
namespace DB
{
using AttributeValueVariant = std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>;
template <typename A>
class ComplexKeysPoolImpl
{
public:
KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys)
{
if constexpr (std::is_same_v<A, SmallObjectPool>)
{
// not working now
const auto res = arena->alloc();
auto place = res;
for (const auto & key_column : key_columns)
{
const StringRef key = key_column->getDataAt(row);
memcpy(place, key.data, key.size);
place += key.size;
}
return KeyRef(res);
}
else
{
const auto keys_size = key_columns.size();
UInt16 sum_keys_size{};
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAt(row);
sum_keys_size += keys[j].size;
if (!key_columns[j]->valuesHaveFixedSize()) // String
sum_keys_size += sizeof(size_t) + 1;
}
auto place = arena.alloc(sum_keys_size + sizeof(sum_keys_size));
auto key_start = place;
memcpy(key_start, &sum_keys_size, sizeof(sum_keys_size));
key_start += sizeof(sum_keys_size);
for (size_t j = 0; j < keys_size; ++j)
{
if (!key_columns[j]->valuesHaveFixedSize()) // String
{
auto start = key_start;
auto key_size = keys[j].size + 1;
memcpy(key_start, &key_size, sizeof(size_t));
key_start += sizeof(size_t);
memcpy(key_start, keys[j].data, keys[j].size);
key_start += keys[j].size;
*key_start = '\0';
++key_start;
keys[j].data = start;
keys[j].size += sizeof(size_t) + 1;
}
else
{
memcpy(key_start, keys[j].data, keys[j].size);
keys[j].data = key_start;
key_start += keys[j].size;
}
}
return KeyRef(place);
}
}
KeyRef copyKeyFrom(const KeyRef & key)
{
char * data = arena.alloc(key.fullSize());
memcpy(data, key.fullData(), key.fullSize());
return KeyRef(data);
}
void freeKey(const KeyRef & key)
{
if constexpr (std::is_same_v<A, ArenaWithFreeLists>)
arena.free(key.fullData(), key.fullSize());
else if constexpr (std::is_same_v<A, SmallObjectPool>)
arena.free(key.fullData());
else
throw Exception("Free not supported.", ErrorCodes::LOGICAL_ERROR);
}
void rollback(const KeyRef & key)
{
if constexpr (std::is_same_v<A, Arena>)
arena.rollback(key.fullSize());
else
throw Exception("Rollback not supported.", ErrorCodes::LOGICAL_ERROR);
}
void writeKey(const KeyRef & key, WriteBuffer & buf)
{
buf.write(key.fullData(), key.fullSize());
}
void readKey(KeyRef & key, ReadBuffer & buf)
{
UInt16 sz;
readBinary(sz, buf);
char * data = nullptr;
if constexpr (std::is_same_v<A, SmallObjectPool>)
data = arena.alloc();
else
data = arena.alloc(sz + sizeof(sz));
memcpy(data, &sz, sizeof(sz));
buf.read(data + sizeof(sz), sz);
key = KeyRef(data);
}
void ignoreKey(ReadBuffer & buf) const
{
UInt16 sz;
readBinary(sz, buf);
buf.ignore(sz);
}
private:
A arena;
};
using TemporalComplexKeysPool = ComplexKeysPoolImpl<Arena>;
using ComplexKeysPool = ComplexKeysPoolImpl<ArenaWithFreeLists>;
//using FixedComplexKeysPool = ComplexKeysPoolImpl<SmallObjectPool>;
template <typename K, typename V, typename Pool>
class ComplexKeyLRUCache
{
using Iter = typename std::list<K>::iterator;
struct Cell
{
Iter iter;
V val;
};
public:
ComplexKeyLRUCache(size_t max_size_, Pool & keys_pool_)
: max_size(max_size_)
, keys_pool(keys_pool_)
{
}
void set(K key, V val)
{
std::lock_guard lock(mutex);
auto it = cache.find(key);
if (it == std::end(cache))
{
auto & item = cache[key];
item.iter = queue.insert(std::end(queue), key);
item.val = val;
if (queue.size() > max_size)
{
keys_pool.freeKey(queue.front());
cache.erase(queue.front());
queue.pop_front();
}
}
else
{
queue.erase(it->second.iter);
it->second.iter = queue.insert(std::end(queue), key);
it->second.val = val;
}
}
bool get(K key, V & val)
{
std::lock_guard lock(mutex);
auto it = cache.find(key);
if (it == std::end(cache))
return false;
val = it->second.val;
queue.erase(it->second.iter);
it->second.iter = queue.insert(std::end(queue), key);
return true;
}
bool erase(K key)
{
std::lock_guard lock(mutex);
auto it = cache.find(key);
if (it == std::end(cache))
return false;
keys_pool.freeKey(key);
queue.erase(it->second.iter);
cache.erase(it);
return true;
}
size_t size()
{
std::lock_guard lock(mutex);
return cache.size();
}
auto begin()
{
std::lock_guard lock(mutex);
return std::begin(cache);
}
auto end()
{
std::lock_guard lock(mutex);
return std::end(cache);
}
private:
std::unordered_map<K, Cell> cache;
std::list<K> queue;
size_t max_size;
Pool & keys_pool;
std::mutex mutex;
};
class SSDComplexKeyCachePartition
{
public:
struct Index final
{
bool inMemory() const;
void setInMemory(const bool in_memory);
bool exists() const;
void setNotExists();
size_t getAddressInBlock() const;
void setAddressInBlock(const size_t address_in_block);
size_t getBlockId() const;
void setBlockId(const size_t block_id);
bool operator< (const Index & rhs) const { return index < rhs.index; }
/// Stores `is_in_memory` flag, block id, address in uncompressed block
uint64_t index = 0;
};
struct Metadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
time_point_t expiresAt() const;
void setExpiresAt(const time_point_t & t);
bool isDefault() const;
void setDefault();
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data = 0;
};
using Offset = size_t;
using Offsets = std::vector<Offset>;
SSDComplexKeyCachePartition(
const AttributeUnderlyingType & key_structure,
const std::vector<AttributeUnderlyingType> & attributes_structure,
const std::string & dir_path,
const size_t file_id,
const size_t max_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDComplexKeyCachePartition();
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<UInt8> & out, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const;
struct Attribute
{
template <typename T>
using Container = std::vector<T>;
AttributeUnderlyingType type;
std::variant<
Container<UInt8>,
Container<UInt16>,
Container<UInt32>,
Container<UInt64>,
Container<UInt128>,
Container<Int8>,
Container<Int16>,
Container<Int32>,
Container<Int64>,
Container<Decimal32>,
Container<Decimal64>,
Container<Decimal128>,
Container<Float32>,
Container<Float64>,
Container<String>> values;
};
using Attributes = std::vector<Attribute>;
size_t appendBlock(
const Columns & key_columns,
const DataTypes & key_types,
const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
size_t appendDefaults(
const KeyRefs & keys,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
void clearOldestBlocks();
void flush();
void remove();
size_t getId() const;
PaddedPODArray<KeyRef> getCachedIds(const std::chrono::system_clock::time_point now) const;
double getLoadFactor() const;
size_t getElementCount() const;
private:
size_t append(
const KeyRefs & keys,
const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
template <typename SetFunc>
void getImpl(const Columns & key_columns, const DataTypes & key_types,
SetFunc & set, std::vector<bool> & found) const;
template <typename SetFunc>
void getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const;
template <typename SetFunc>
void getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc & set) const;
void ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const;
/*KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys) const;
void freeKey(const KeyRef key) const;
void writeKey(KeyRef key, WriteBuffer & buf);
template <typename ArenaForKey>
void readKey(KeyRef & key, ArenaForKey & arena, ReadBuffer & buf);
void ignoreKey(ReadBuffer & buf);*/
const size_t file_id;
const size_t max_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
const std::string path;
mutable std::shared_mutex rw_lock;
int fd = -1;
ComplexKeysPool keys_pool;
mutable ComplexKeyLRUCache<KeyRef, Index, ComplexKeysPool> key_to_index;
KeyRefs keys_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;
std::optional<Memory<>> memory;
std::optional<WriteBuffer> write_buffer;
uint32_t keys_in_block = 0;
//CompressionCodecPtr codec;
size_t current_memory_block_id = 0;
size_t current_file_block_id = 0;
};
using SSDComplexKeyCachePartitionPtr = std::shared_ptr<SSDComplexKeyCachePartition>;
class SSDComplexKeyCacheStorage
{
public:
using AttributeTypes = std::vector<AttributeUnderlyingType>;
SSDComplexKeyCacheStorage(
const AttributeTypes & attributes_structure,
const std::string & path,
const size_t max_partitions_count,
const size_t partition_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDComplexKeyCacheStorage();
template <typename T>
using ResultArrayType = SSDComplexKeyCachePartition::ResultArrayType<T>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<Out> & out, std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool,
GetDefault & get_default, std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const Columns & key_columns, const DataTypes & key_types,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const Columns & key_columns, const DataTypes & key_types, ResultArrayType<UInt8> & out,
std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool, std::chrono::system_clock::time_point now) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr,
const Columns & key_columns, const DataTypes & key_types,
const KeyRefs & required_keys, const std::vector<size_t> & required_rows,
PresentIdHandler && on_updated, AbsentIdHandler && on_key_not_found,
const DictionaryLifetime lifetime);
PaddedPODArray<KeyRef> getCachedIds() const;
std::exception_ptr getLastException() const { return last_update_exception; }
const std::string & getPath() const { return path; }
size_t getQueryCount() const { return query_count.load(std::memory_order_relaxed); }
size_t getHitCount() const { return hit_count.load(std::memory_order_acquire); }
size_t getElementCount() const;
double getLoadFactor() const;
private:
SSDComplexKeyCachePartition::Attributes createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure);
void collectGarbage();
const AttributeTypes attributes_structure;
const std::string path;
const size_t max_partitions_count;
const size_t partition_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
mutable std::shared_mutex rw_lock;
std::list<SSDComplexKeyCachePartitionPtr> partitions;
std::list<SSDComplexKeyCachePartitionPtr> partition_delete_queue;
Logger * const log;
mutable pcg64 rnd_engine;
mutable std::exception_ptr last_update_exception;
mutable size_t update_error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
// stats
//mutable size_t bytes_allocated = 0;
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
class SSDComplexKeyCacheDictionary final : public IDictionaryBase
{
public:
SSDComplexKeyCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const std::string & path,
const size_t max_partitions_count_,
const size_t partition_size_,
const size_t block_size_,
const size_t read_buffer_size_,
const size_t write_buffer_size_,
const size_t max_stored_keys_);
const std::string & getDatabase() const override { return name; }
const std::string & getName() const override { return name; }
const std::string & getFullName() const override { return getName(); }
std::string getKeyDescription() const { return dict_struct.getKeyDescription(); }
std::string getTypeName() const override { return "SSDComplexKeyCache"; }
size_t getBytesAllocated() const override { return 0; } // TODO: ?
size_t getQueryCount() const override { return storage.getQueryCount(); }
double getHitRate() const override
{
return static_cast<double>(storage.getHitCount()) / storage.getQueryCount();
}
size_t getElementCount() const override { return storage.getElementCount(); }
double getLoadFactor() const override { return storage.getLoadFactor(); }
bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<SSDComplexKeyCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, path,
max_partitions_count, partition_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[getAttributeIndex(attribute_name)].injective;
}
/*bool hasHierarchy() const { return false; }
void toParent(const PaddedPODArray<Key> &, PaddedPODArray<Key> &) const { }*/
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>
using ResultArrayType = SSDComplexKeyCacheStorage::ResultArrayType<T>;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, const ColumnString * const def, ColumnString * const out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, const String & def, ColumnString * const out) const;
void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
size_t getAttributeIndex(const std::string & attr_name) const;
template <typename T>
AttributeValueVariant createAttributeNullValueWithTypeImpl(const Field & null_value);
AttributeValueVariant createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value);
void createAttributes();
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void getItemsNumberImpl(
const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsStringImpl(
const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ColumnString * out, DefaultGetter && get_default) const;
const std::string name;
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const std::string path;
const size_t max_partitions_count;
const size_t partition_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
std::map<std::string, size_t> attribute_index_by_name;
std::vector<AttributeValueVariant> null_values;
mutable SSDComplexKeyCacheStorage storage;
Logger * const log;
mutable size_t bytes_allocated = 0;
};
}

View File

@ -32,6 +32,7 @@ void registerDictionaries()
registerDictionaryHashed(factory);
registerDictionaryCache(factory);
registerDictionarySSDCache(factory);
registerDictionarySSDComplexKeyCache(factory);
registerDictionaryPolygon(factory);
}
}

View File

@ -25,6 +25,7 @@ void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
void registerDictionarySSDCache(DictionaryFactory & factory);
void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory);
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaries();

View File

@ -30,6 +30,7 @@
#include <Dictionaries/HashedDictionary.h>
#include <Dictionaries/CacheDictionary.h>
#include <Dictionaries/SSDCacheDictionary.h>
#include <Dictionaries/SSDComplexKeyCacheDictionary.h>
#include <Dictionaries/ComplexKeyHashedDictionary.h>
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/RangeHashedDictionary.h>
@ -136,7 +137,7 @@ private:
!executeDispatchSimple<CacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchSimple<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
#endif
@ -311,6 +312,7 @@ private:
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
#endif
@ -496,6 +498,7 @@ private:
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
#endif
@ -837,6 +840,7 @@ private:
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
#endif
@ -1100,6 +1104,7 @@ private:
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
#endif