Merge branch 'nikvas0/ssd_dict' of git://github.com/nikvas0/ClickHouse into merging-ssh-cache

This commit is contained in:
Nikita Mikhaylov 2020-06-24 16:45:18 +03:00
commit e55577ba65
18 changed files with 5386 additions and 35 deletions

View File

@ -54,10 +54,12 @@ LAYOUT(LAYOUT_TYPE(param value)) -- layout settings
- [hashed](#dicts-external_dicts_dict_layout-hashed)
- [sparse\_hashed](#dicts-external_dicts_dict_layout-sparse_hashed)
- [cache](#cache)
- [ssd\_cache](#ssd-cache)
- [direct](#direct)
- [range\_hashed](#range-hashed)
- [complex\_key\_hashed](#complex-key-hashed)
- [complex\_key\_cache](#complex-key-cache)
- [ssd\_complex\_key\_cache](#ssd-cache)
- [complex\_key\_direct](#complex-key-direct)
- [ip\_trie](#ip-trie)
@ -296,6 +298,40 @@ Set a large enough cache size. You need to experiment to select the number of ce
This type of storage is for use with composite [keys](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md). Similar to `cache`.
### ssd\_cache {#ssd-cache}
Similar to `cache`, but stores data on SSD and index in RAM.
``` xml
<layout>
<ssd_cache>
<!-- Size of elementary read block in bytes. Recommended to be equal to SSD's page size. -->
<block_size>4096</block_size>
<!-- Max cache file size in bytes. -->
<file_size>16777216</file_size>
<!-- Size of RAM buffer in bytes for reading elements from SSD. -->
<read_buffer_size>131072</read_buffer_size>
<!-- Size of RAM buffer in bytes for aggregating elements before flushing to SSD. -->
<write_buffer_size>1048576</write_buffer_size>
<!-- Path where cache file will be stored. -->
<path>/var/lib/clickhouse/clickhouse_dictionaries/test_dict</path>
<!-- Max number on stored keys in the cache. Rounded up to a power of two. -->
<max_stored_keys>1048576</max_stored_keys>
</ssd_cache>
</layout>
```
or
``` sql
LAYOUT(CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576
PATH /var/lib/clickhouse/clickhouse_dictionaries/test_dict MAX_STORED_KEYS 1048576))
```
### complex\_key\_ssd\_cache {#complex-key-ssd-cache}
This type of storage is for use with composite [keys](external-dicts-dict-structure.md). Similar to `ssd\_cache`.
### direct {#direct}
The dictionary is not stored in memory and directly goes to the source during the processing of a request.

View File

@ -0,0 +1,226 @@
#pragma once
#include <Common/HashTable/Hash.h>
#include <common/logger_useful.h>
#include <type_traits>
#include <vector>
namespace DB
{
namespace
{
inline size_t roundUpToPowerOfTwoOrZero(size_t x)
{
size_t r = 8;
while (x > r)
r <<= 1;
return r;
}
}
struct EmptyDeleter {};
struct Int64Hasher
{
size_t operator()(const size_t x) const
{
return intHash64(x);
}
};
/*
Class for storing cache index.
It consists of two arrays.
The first one is splitted into buckets (each stores 8 elements (cells)) determined by hash of the element key.
The second one is splitted into 4bit numbers, which are positions in bucket for next element write (So cache uses FIFO eviction algorithm inside each bucket).
*/
template <typename K, typename V, typename Hasher, typename Deleter = EmptyDeleter>
class BucketCacheIndex
{
struct Cell
{
K key;
V index;
};
public:
template <typename = std::enable_if<std::is_same_v<EmptyDeleter, Deleter>>>
BucketCacheIndex(size_t cells_)
: buckets(roundUpToPowerOfTwoOrZero(cells_) / bucket_size)
, bucket_mask(buckets - 1)
, cells(buckets * bucket_size)
, positions((buckets / 2) + 1)
{
for (auto & cell : cells)
cell.index.setNotExists();
for (size_t bucket = 0; bucket < buckets; ++bucket)
setPosition(bucket, 0);
}
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
BucketCacheIndex(size_t cells_, Deleter deleter_)
: deleter(deleter_)
, buckets(roundUpToPowerOfTwoOrZero(cells_) / bucket_size)
, bucket_mask(buckets - 1)
, cells(buckets * bucket_size)
, positions((buckets / 2) + 1)
{
for (auto & cell : cells)
cell.index.setNotExists();
for (size_t bucket = 0; bucket < buckets; ++bucket)
setPosition(bucket, 0);
}
void set(K key, V val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists())
{
incPosition(bucket);
++sz;
}
cells[idx].key = key;
cells[idx].index = val;
}
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
void setWithDelete(K key, V val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists())
{
incPosition(bucket);
++sz;
}
else
{
deleter(cells[idx].key);
}
cells[idx].key = key;
cells[idx].index = val;
}
bool get(K key, V & val) const
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
val = cells[idx].index;
return true;
}
bool getKeyAndValue(K & key, V & val) const
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
key = cells[idx].key;
val = cells[idx].index;
return true;
}
bool erase(K key)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
cells[idx].index.setNotExists();
--sz;
if constexpr (!std::is_same_v<EmptyDeleter, Deleter>)
deleter(cells[idx].key);
return true;
}
size_t size() const
{
return sz;
}
size_t capacity() const
{
return cells.size();
}
auto keys() const
{
std::vector<K> res;
for (const auto & cell : cells)
{
if (cell.index.exists())
{
res.push_back(cell.key);
}
}
return res;
}
private:
/// Searches for the key in the bucket.
/// Returns index of cell with provided key.
size_t getCellIndex(const K key, const size_t bucket) const
{
const size_t pos = getPosition(bucket);
for (int idx = 7; idx >= 0; --idx)
{
const size_t cur = ((pos + 1 + idx) & pos_mask);
if (cells[bucket * bucket_size + cur].index.exists() &&
cells[bucket * bucket_size + cur].key == key)
{
return bucket * bucket_size + cur;
}
}
return bucket * bucket_size + pos;
}
/// Returns current position for write in the bucket.
size_t getPosition(const size_t bucket) const
{
const size_t idx = (bucket >> 1);
if ((bucket & 1) == 0)
return ((positions[idx] >> 4) & pos_mask);
return (positions[idx] & pos_mask);
}
/// Sets current posiotion in the bucket.
void setPosition(const size_t bucket, const size_t pos)
{
const size_t idx = bucket >> 1;
if ((bucket & 1) == 0)
positions[idx] = ((pos << 4) | (positions[idx] & ((1 << 4) - 1)));
else
positions[idx] = (pos | (positions[idx] & (((1 << 4) - 1) << 4)));
}
void incPosition(const size_t bucket)
{
setPosition(bucket, (getPosition(bucket) + 1) & pos_mask);
}
static constexpr size_t bucket_size = 8;
static constexpr size_t pos_size = 3;
static constexpr size_t pos_mask = (1 << pos_size) - 1;
Hasher hash;
Deleter deleter;
size_t buckets;
size_t bucket_mask;
std::vector<Cell> cells;
std::vector<char> positions;
size_t sz = 0;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,471 @@
#pragma once
#if defined(__linux__) || defined(__FreeBSD__)
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <atomic>
#include <chrono>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/Block.h>
#include <Dictionaries/BucketCache.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferAIO.h>
#include <list>
#include <pcg_random.hpp>
#include <Poco/Logger.h>
#include <shared_mutex>
#include <variant>
#include <vector>
namespace DB
{
using AttributeValueVariant = std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>;
/*
Class for operations with cache file and index.
Supports GET/SET operations.
*/
class SSDCachePartition
{
public:
struct Index final
{
bool inMemory() const;
void setInMemory(const bool in_memory);
bool exists() const;
void setNotExists();
size_t getAddressInBlock() const;
void setAddressInBlock(const size_t address_in_block);
size_t getBlockId() const;
void setBlockId(const size_t block_id);
bool operator< (const Index & rhs) const { return index < rhs.index; }
/// Stores `is_in_memory` flag, block id, address in uncompressed block
uint64_t index = 0;
};
struct Metadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
time_point_t expiresAt() const;
void setExpiresAt(const time_point_t & t);
bool isDefault() const;
void setDefault();
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data = 0;
};
using Offset = size_t;
using Offsets = std::vector<Offset>;
using Key = IDictionary::Key;
SSDCachePartition(
const AttributeUnderlyingType & key_structure,
const std::vector<AttributeUnderlyingType> & attributes_structure,
const std::string & dir_path,
const size_t file_id,
const size_t max_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDCachePartition();
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::vector<bool> & found, std::chrono::system_clock::time_point now) const;
struct Attribute
{
template <typename T>
using Container = std::vector<T>;
AttributeUnderlyingType type;
std::variant<
Container<UInt8>,
Container<UInt16>,
Container<UInt32>,
Container<UInt64>,
Container<UInt128>,
Container<Int8>,
Container<Int16>,
Container<Int32>,
Container<Int64>,
Container<Decimal32>,
Container<Decimal64>,
Container<Decimal128>,
Container<Float32>,
Container<Float64>,
Container<String>> values;
};
using Attributes = std::vector<Attribute>;
size_t appendBlock(const Attribute & new_keys, const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata, const size_t begin);
size_t appendDefaults(const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin);
void flush();
void remove();
size_t getId() const;
PaddedPODArray<Key> getCachedIds(const std::chrono::system_clock::time_point now) const;
double getLoadFactor() const;
size_t getElementCount() const;
size_t getBytesAllocated() const;
private:
void clearOldestBlocks();
template <typename SetFunc>
void getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, std::vector<bool> & found) const;
template <typename SetFunc>
void getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const;
template <typename SetFunc>
void getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc & set) const;
void ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const;
const size_t file_id;
const size_t max_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
const std::string path;
mutable std::shared_mutex rw_lock;
int fd = -1;
mutable BucketCacheIndex<UInt64, Index, Int64Hasher> key_to_index;
Attribute keys_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;
std::optional<Memory<>> memory;
std::optional<WriteBuffer> write_buffer;
uint32_t keys_in_block = 0;
size_t current_memory_block_id = 0;
size_t current_file_block_id = 0;
};
using SSDCachePartitionPtr = std::shared_ptr<SSDCachePartition>;
/*
Class for managing SSDCachePartition and getting data from source.
*/
class SSDCacheStorage
{
public:
using AttributeTypes = std::vector<AttributeUnderlyingType>;
using Key = SSDCachePartition::Key;
SSDCacheStorage(
const AttributeTypes & attributes_structure,
const std::string & path,
const size_t max_partitions_count,
const size_t file_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDCacheStorage();
template <typename T>
using ResultArrayType = SSDCachePartition::ResultArrayType<T>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
GetDefault & get_default, std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime);
PaddedPODArray<Key> getCachedIds() const;
std::exception_ptr getLastException() const { return last_update_exception; }
const std::string & getPath() const { return path; }
size_t getQueryCount() const { return query_count.load(std::memory_order_relaxed); }
size_t getHitCount() const { return hit_count.load(std::memory_order_acquire); }
size_t getElementCount() const;
double getLoadFactor() const;
size_t getBytesAllocated() const;
private:
void collectGarbage();
const AttributeTypes attributes_structure;
const std::string path;
const size_t max_partitions_count;
const size_t file_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
mutable std::shared_mutex rw_lock;
std::list<SSDCachePartitionPtr> partitions;
std::list<SSDCachePartitionPtr> partition_delete_queue;
Poco::Logger * const log;
mutable pcg64 rnd_engine;
mutable std::exception_ptr last_update_exception;
mutable size_t update_error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
/*
Dictionary interface
*/
class SSDCacheDictionary final : public IDictionary
{
public:
SSDCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const std::string & path,
const size_t max_partitions_count_,
const size_t file_size_,
const size_t block_size_,
const size_t read_buffer_size_,
const size_t write_buffer_size_,
const size_t max_stored_keys_);
const std::string & getDatabase() const override { return name; }
const std::string & getName() const override { return name; }
const std::string & getFullName() const override { return getName(); }
std::string getTypeName() const override { return "SSDCache"; }
size_t getBytesAllocated() const override { return storage.getBytesAllocated(); }
size_t getQueryCount() const override { return storage.getQueryCount(); }
double getHitRate() const override
{
return static_cast<double>(storage.getHitCount()) / storage.getQueryCount();
}
size_t getElementCount() const override { return storage.getElementCount(); }
double getLoadFactor() const override { return storage.getLoadFactor(); }
bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<SSDCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, path,
max_partitions_count, file_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[getAttributeIndex(attribute_name)].injective;
}
bool hasHierarchy() const override { return false; }
void toParent(const PaddedPODArray<Key> &, PaddedPODArray<Key> &) const override { }
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>
using ResultArrayType = SSDCacheStorage::ResultArrayType<T>;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const PaddedPODArray<Key> & ids, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void
getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out)
const;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const TYPE def, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const;
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
size_t getAttributeIndex(const std::string & attr_name) const;
template <typename T>
AttributeValueVariant createAttributeNullValueWithTypeImpl(const Field & null_value);
AttributeValueVariant createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value);
void createAttributes();
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void getItemsNumberImpl(
const size_t attribute_index, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsStringImpl(const size_t attribute_index, const PaddedPODArray<Key> & ids,
ColumnString * out, DefaultGetter && get_default) const;
const std::string name;
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const std::string path;
const size_t max_partitions_count;
const size_t file_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
std::map<std::string, size_t> attribute_index_by_name;
std::vector<AttributeValueVariant> null_values;
mutable SSDCacheStorage storage;
Poco::Logger * const log;
mutable size_t bytes_allocated = 0;
};
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,706 @@
#pragma once
#if defined(__linux__) || defined(__FreeBSD__)
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <atomic>
#include <chrono>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#include <Common/SmallObjectPool.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/Block.h>
#include <Dictionaries/BucketCache.h>
#include <ext/scope_guard.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferAIO.h>
#include <list>
#include <pcg_random.hpp>
#include <Poco/Logger.h
#include <shared_mutex>
#include <variant>
#include <vector>
namespace DB
{
class KeyRef
{
public:
explicit KeyRef(char * data) : ptr(data) {}
KeyRef() : ptr(nullptr) {}
inline UInt16 size() const
{
UInt16 res;
memcpy(&res, ptr, sizeof(res));
return res;
}
inline size_t fullSize() const
{
return static_cast<size_t>(size()) + sizeof(UInt16);
}
inline bool isNull() const
{
return ptr == nullptr;
}
inline char * data() const
{
return ptr + sizeof(UInt16);
}
inline char * fullData() const
{
return ptr;
}
inline char * fullData()
{
return ptr;
}
inline const StringRef getRef() const
{
return StringRef(data(), size());
}
inline bool operator==(const KeyRef & other) const
{
return getRef() == other.getRef();
}
inline bool operator!=(const KeyRef & other) const
{
return !(*this == other);
}
inline bool operator<(const KeyRef & other) const
{
return getRef() < other.getRef();
}
private:
char * ptr;
};
using KeyRefs = std::vector<KeyRef>;
}
namespace std
{
template <>
struct hash<DB::KeyRef>
{
size_t operator() (DB::KeyRef key_ref) const
{
return hasher(key_ref.getRef());
}
std::hash<StringRef> hasher;
};
}
namespace DB
{
using AttributeValueVariant = std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>;
/*
The pool for storing complex keys.
*/
template <typename A>
class ComplexKeysPoolImpl
{
public:
KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys)
{
const auto keys_size = key_columns.size();
UInt16 sum_keys_size{};
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAt(row);
sum_keys_size += keys[j].size;
if (!key_columns[j]->valuesHaveFixedSize()) // String
sum_keys_size += sizeof(size_t) + 1;
}
auto place = arena.alloc(sum_keys_size + sizeof(sum_keys_size));
auto key_start = place;
memcpy(key_start, &sum_keys_size, sizeof(sum_keys_size));
key_start += sizeof(sum_keys_size);
for (size_t j = 0; j < keys_size; ++j)
{
if (!key_columns[j]->valuesHaveFixedSize()) // String
{
auto key_size = keys[j].size + 1;
memcpy(key_start, &key_size, sizeof(size_t));
key_start += sizeof(size_t);
memcpy(key_start, keys[j].data, keys[j].size);
key_start += keys[j].size;
*key_start = '\0';
++key_start;
}
else
{
memcpy(key_start, keys[j].data, keys[j].size);
key_start += keys[j].size;
}
}
return KeyRef(place);
}
KeyRef copyKeyFrom(const KeyRef & key)
{
char * data = arena.alloc(key.fullSize());
memcpy(data, key.fullData(), key.fullSize());
return KeyRef(data);
}
void freeKey(const KeyRef & key)
{
if constexpr (std::is_same_v<A, ArenaWithFreeLists>)
arena.free(key.fullData(), key.fullSize());
}
void rollback(const KeyRef & key)
{
if constexpr (std::is_same_v<A, Arena>)
arena.rollback(key.fullSize());
}
void writeKey(const KeyRef & key, WriteBuffer & buf)
{
buf.write(key.fullData(), key.fullSize());
}
void readKey(KeyRef & key, ReadBuffer & buf)
{
UInt16 sz;
readBinary(sz, buf);
char * data = nullptr;
if constexpr (std::is_same_v<A, SmallObjectPool>)
data = arena.alloc();
else
data = arena.alloc(sz + sizeof(sz));
memcpy(data, &sz, sizeof(sz));
buf.read(data + sizeof(sz), sz);
key = KeyRef(data);
}
void ignoreKey(ReadBuffer & buf) const
{
UInt16 sz;
readBinary(sz, buf);
buf.ignore(sz);
}
size_t size() const
{
return arena.size();
}
private:
A arena;
};
using TemporalComplexKeysPool = ComplexKeysPoolImpl<Arena>;
using ComplexKeysPool = ComplexKeysPoolImpl<ArenaWithFreeLists>;
struct KeyDeleter
{
KeyDeleter(ComplexKeysPool & keys_pool_) : keys_pool(keys_pool_) {}
void operator()(const KeyRef key) const
{
keys_pool.freeKey(key);
}
ComplexKeysPool & keys_pool;
};
/*
Class for operations with cache file and index.
Supports GET/SET operations.
*/
class SSDComplexKeyCachePartition
{
public:
struct Index final
{
bool inMemory() const;
void setInMemory(const bool in_memory);
bool exists() const;
void setNotExists();
size_t getAddressInBlock() const;
void setAddressInBlock(const size_t address_in_block);
size_t getBlockId() const;
void setBlockId(const size_t block_id);
bool operator< (const Index & rhs) const { return index < rhs.index; }
/// Stores `is_in_memory` flag, block id, address in uncompressed block
uint64_t index = 0;
};
struct Metadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
time_point_t expiresAt() const;
void setExpiresAt(const time_point_t & t);
bool isDefault() const;
void setDefault();
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data = 0;
};
using Offset = size_t;
using Offsets = std::vector<Offset>;
SSDComplexKeyCachePartition(
const AttributeUnderlyingType & key_structure,
const std::vector<AttributeUnderlyingType> & attributes_structure,
const std::string & dir_path,
const size_t file_id,
const size_t max_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDComplexKeyCachePartition();
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<UInt8> & out, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const;
struct Attribute
{
template <typename T>
using Container = std::vector<T>;
AttributeUnderlyingType type;
std::variant<
Container<UInt8>,
Container<UInt16>,
Container<UInt32>,
Container<UInt64>,
Container<UInt128>,
Container<Int8>,
Container<Int16>,
Container<Int32>,
Container<Int64>,
Container<Decimal32>,
Container<Decimal64>,
Container<Decimal128>,
Container<Float32>,
Container<Float64>,
Container<String>> values;
};
using Attributes = std::vector<Attribute>;
size_t appendBlock(
const Columns & key_columns,
const DataTypes & key_types,
const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
size_t appendDefaults(
const KeyRefs & keys,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
void clearOldestBlocks();
void flush();
void remove();
size_t getId() const;
double getLoadFactor() const;
size_t getElementCount() const;
size_t getBytesAllocated() const;
private:
size_t append(
const KeyRefs & keys,
const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata,
const size_t begin);
template <typename SetFunc>
void getImpl(const Columns & key_columns, const DataTypes & key_types,
SetFunc & set, std::vector<bool> & found) const;
template <typename SetFunc>
void getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const;
template <typename SetFunc>
void getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc & set) const;
void ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const;
const size_t file_id;
const size_t max_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
const std::string path;
mutable std::shared_mutex rw_lock;
int fd = -1;
ComplexKeysPool keys_pool;
mutable BucketCacheIndex<KeyRef, Index, std::hash<KeyRef>, KeyDeleter> key_to_index;
std::optional<TemporalComplexKeysPool> keys_buffer_pool;
KeyRefs keys_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;
std::optional<Memory<>> memory;
std::optional<WriteBuffer> write_buffer;
uint32_t keys_in_block = 0;
size_t current_memory_block_id = 0;
size_t current_file_block_id = 0;
};
using SSDComplexKeyCachePartitionPtr = std::shared_ptr<SSDComplexKeyCachePartition>;
/*
Class for managing SSDCachePartition and getting data from source.
*/
class SSDComplexKeyCacheStorage
{
public:
using AttributeTypes = std::vector<AttributeUnderlyingType>;
SSDComplexKeyCacheStorage(
const AttributeTypes & attributes_structure,
const std::string & path,
const size_t max_partitions_count,
const size_t file_size,
const size_t block_size,
const size_t read_buffer_size,
const size_t write_buffer_size,
const size_t max_stored_keys);
~SSDComplexKeyCacheStorage();
template <typename T>
using ResultArrayType = SSDComplexKeyCachePartition::ResultArrayType<T>;
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<Out> & out, std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool,
GetDefault & get_default, std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const Columns & key_columns, const DataTypes & key_types,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool,
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const Columns & key_columns, const DataTypes & key_types, ResultArrayType<UInt8> & out,
std::unordered_map<KeyRef, std::vector<size_t>> & not_found,
TemporalComplexKeysPool & not_found_pool, std::chrono::system_clock::time_point now) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr,
const Columns & key_columns, const DataTypes & key_types,
const KeyRefs & required_keys, const std::vector<size_t> & required_rows,
TemporalComplexKeysPool & tmp_keys_pool,
PresentIdHandler && on_updated, AbsentIdHandler && on_key_not_found,
const DictionaryLifetime lifetime);
std::exception_ptr getLastException() const { return last_update_exception; }
const std::string & getPath() const { return path; }
size_t getQueryCount() const { return query_count.load(std::memory_order_relaxed); }
size_t getHitCount() const { return hit_count.load(std::memory_order_acquire); }
size_t getElementCount() const;
double getLoadFactor() const;
private:
void collectGarbage();
const AttributeTypes attributes_structure;
const std::string path;
const size_t max_partitions_count;
const size_t file_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
mutable std::shared_mutex rw_lock;
std::list<SSDComplexKeyCachePartitionPtr> partitions;
std::list<SSDComplexKeyCachePartitionPtr> partition_delete_queue;
Poco::Logger * const log;
mutable pcg64 rnd_engine;
mutable std::exception_ptr last_update_exception;
mutable size_t update_error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
/*
Dictionary interface
*/
class SSDComplexKeyCacheDictionary final : public IDictionaryBase
{
public:
SSDComplexKeyCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const std::string & path,
const size_t max_partitions_count_,
const size_t file_size_,
const size_t block_size_,
const size_t read_buffer_size_,
const size_t write_buffer_size_,
const size_t max_stored_keys_);
const std::string & getDatabase() const override { return name; }
const std::string & getName() const override { return name; }
const std::string & getFullName() const override { return getName(); }
std::string getKeyDescription() const { return dict_struct.getKeyDescription(); }
std::string getTypeName() const override { return "SSDComplexKeyCache"; }
size_t getBytesAllocated() const override { return 0; } // TODO: ?
size_t getQueryCount() const override { return storage.getQueryCount(); }
double getHitRate() const override
{
return static_cast<double>(storage.getHitCount()) / storage.getQueryCount();
}
size_t getElementCount() const override { return storage.getElementCount(); }
double getLoadFactor() const override { return storage.getLoadFactor(); }
bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<SSDComplexKeyCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, path,
max_partitions_count, file_size, block_size, read_buffer_size, write_buffer_size, max_stored_keys);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[getAttributeIndex(attribute_name)].injective;
}
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>
using ResultArrayType = SSDComplexKeyCacheStorage::ResultArrayType<T>;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, const ColumnString * const def, ColumnString * const out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const Columns & key_columns,
const DataTypes & key_types, const String & def, ColumnString * const out) const;
void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
size_t getAttributeIndex(const std::string & attr_name) const;
template <typename T>
AttributeValueVariant createAttributeNullValueWithTypeImpl(const Field & null_value);
AttributeValueVariant createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value);
void createAttributes();
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void getItemsNumberImpl(
const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsStringImpl(
const size_t attribute_index,
const Columns & key_columns, const DataTypes & key_types,
ColumnString * out, DefaultGetter && get_default) const;
const std::string name;
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const std::string path;
const size_t max_partitions_count;
const size_t file_size;
const size_t block_size;
const size_t read_buffer_size;
const size_t write_buffer_size;
const size_t max_stored_keys;
std::map<std::string, size_t> attribute_index_by_name;
std::vector<AttributeValueVariant> null_values;
mutable SSDComplexKeyCacheStorage storage;
Poco::Logger * const log;
mutable size_t bytes_allocated = 0;
};
}
#endif

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DICTIONARY_DEFINITION;
}
@ -97,13 +98,22 @@ void buildLayoutConfiguration(
root->appendChild(layout_element);
AutoPtr<Element> layout_type_element(doc->createElement(layout->layout_type));
layout_element->appendChild(layout_type_element);
if (layout->parameter.has_value())
for (const auto & param : layout->parameters)
{
const auto & param = layout->parameter;
AutoPtr<Element> layout_type_parameter_element(doc->createElement(param->first));
const ASTLiteral & literal = param->second->as<const ASTLiteral &>();
AutoPtr<Text> value(doc->createTextNode(toString(literal.value.get<UInt64>())));
layout_type_parameter_element->appendChild(value);
AutoPtr<Element> layout_type_parameter_element(doc->createElement(param.first));
const ASTLiteral & literal = param.second->as<const ASTLiteral &>();
Field::dispatch([&](auto & value)
{
if constexpr (std::is_same_v<std::decay_t<decltype(value)>, UInt64> || std::is_same_v<std::decay_t<decltype(value)>, String>)
{
AutoPtr<Text> value_to_append(doc->createTextNode(toString(value)));
layout_type_parameter_element->appendChild(value_to_append);
}
else
{
throw DB::Exception{"Wrong type of layout argument.", ErrorCodes::BAD_ARGUMENTS};
}
}, literal.value);
layout_type_element->appendChild(layout_type_parameter_element);
}
}

View File

@ -33,6 +33,10 @@ void registerDictionaries()
registerDictionaryFlat(factory);
registerDictionaryHashed(factory);
registerDictionaryCache(factory);
#if defined(__linux__) || defined(__FreeBSD__)
registerDictionarySSDCache(factory);
registerDictionarySSDComplexKeyCache(factory);
#endif
registerDictionaryPolygon(factory);
registerDictionaryDirect(factory);
}

View File

@ -26,6 +26,10 @@ void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
void registerDictionaryCache(DictionaryFactory & factory);
#if defined(__linux__) || defined(__FreeBSD__)
void registerDictionarySSDCache(DictionaryFactory & factory);
void registerDictionarySSDComplexKeyCache(DictionaryFactory & factory);
#endif
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaryDirect(DictionaryFactory & factory);

View File

@ -62,6 +62,8 @@ SRCS(
RedisBlockInputStream.cpp
RedisDictionarySource.cpp
registerDictionaries.cpp
SSDCacheDictionary.cpp
SSDComplexKeyCacheDictionary.cpp
writeParenthesisedString.cpp
XDBCDictionarySource.cpp

View File

@ -29,6 +29,10 @@
#include <Dictionaries/FlatDictionary.h>
#include <Dictionaries/HashedDictionary.h>
#include <Dictionaries/CacheDictionary.h>
#if defined(__linux__) || defined(__FreeBSD__)
#include <Dictionaries/SSDCacheDictionary.h>
#include <Dictionaries/SSDComplexKeyCacheDictionary.h>
#endif
#include <Dictionaries/ComplexKeyHashedDictionary.h>
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/ComplexKeyDirectDictionary.h>
@ -171,16 +175,22 @@ private:
auto dict = helper.getDictionary(block.getByPosition(arguments[0]));
if (!executeDispatchSimple<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchSimple<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict) &&
!executeDispatchSimple<DirectDictionary>(block, arguments, result, dict))
!executeDispatchComplex<SimplePolygonDictionary>(block, arguments, result, dict))
throw Exception{"Unsupported dictionary type " + dict->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
}
@ -321,12 +331,18 @@ private:
auto dict = helper.getDictionary(block.getByPosition(arguments[0]));
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
#endif
@ -499,12 +515,18 @@ private:
auto dict = helper.getDictionary(block.getByPosition(arguments[0]));
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
#endif
@ -833,12 +855,18 @@ private:
auto dict = helper.getDictionary(block.getByPosition(arguments[0]));
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
#endif
@ -1088,12 +1116,18 @@ private:
auto dict = helper.getDictionary(block.getByPosition(arguments[0]));
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict) &&
!executeDispatch<DirectDictionary>(block, arguments, result, dict) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict) &&
#endif
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyDirectDictionary>(block, arguments, result, dict) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#if defined(__linux__) || defined(__FreeBSD__)
!executeDispatchComplex<SSDComplexKeyCacheDictionary>(block, arguments, result, dict) &&
#endif
#if !defined(ARCADIA_BUILD)
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict) &&
#endif

View File

@ -67,10 +67,12 @@ ASTPtr ASTDictionaryLayout::clone() const
auto res = std::make_shared<ASTDictionaryLayout>(*this);
res->children.clear();
res->layout_type = layout_type;
if (parameter.has_value())
res->parameters.clear();
res->has_brackets = has_brackets;
for (const auto & parameter : parameters)
{
res->parameter.emplace(parameter->first, nullptr);
res->set(res->parameter->second, parameter->second->clone());
res->parameters.emplace_back(parameter.first, nullptr);
res->set(res->parameters.back().second, parameter.second->clone());
}
return res;
}
@ -91,14 +93,17 @@ void ASTDictionaryLayout::formatImpl(const FormatSettings & settings,
if (has_brackets)
settings.ostr << "(";
if (parameter)
bool first = true;
for (const auto & parameter : parameters)
{
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< Poco::toUpper(parameter->first)
settings.ostr << (first ? "" : " ")
<< (settings.hilite ? hilite_keyword : "")
<< Poco::toUpper(parameter.first)
<< (settings.hilite ? hilite_none : "")
<< " ";
parameter->second->formatImpl(settings, state, frame);
parameter.second->formatImpl(settings, state, frame);
first = false;
}
if (has_brackets)

View File

@ -35,8 +35,8 @@ class ASTDictionaryLayout : public IAST
public:
/// flat, cache, hashed, etc.
String layout_type;
/// optional parameter (size_in_cells)
std::optional<KeyValue> parameter;
/// parameters (size_in_cells, ...)
std::vector<KeyValue> parameters;
/// has brackets after layout type
bool has_brackets = true;

View File

@ -126,25 +126,21 @@ bool ParserDictionaryLayout::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
res->has_brackets = func.has_brackets;
const ASTExpressionList & type_expr_list = func.elements->as<const ASTExpressionList &>();
/// there are no layout with more than 1 parameter
if (type_expr_list.children.size() > 1)
return false;
/// if layout has params than brackets must be specified
if (!type_expr_list.children.empty() && !res->has_brackets)
return false;
if (type_expr_list.children.size() == 1)
for (const auto & child : type_expr_list.children)
{
const ASTPair * pair = dynamic_cast<const ASTPair *>(type_expr_list.children.at(0).get());
const ASTPair * pair = dynamic_cast<const ASTPair *>(child.get());
if (pair == nullptr)
return false;
const ASTLiteral * literal = dynamic_cast<const ASTLiteral *>(pair->second.get());
if (literal == nullptr || literal->value.getType() != Field::Types::UInt64)
if (literal == nullptr || (literal->value.getType() != Field::Types::UInt64 && literal->value.getType() != Field::Types::String))
return false;
res->parameter.emplace(pair->first, nullptr);
res->set(res->parameter->second, literal->clone());
res->parameters.emplace_back(pair->first, nullptr);
res->set(res->parameters.back().second, literal->clone());
}
node = res;

View File

@ -0,0 +1,39 @@
TEST_SMALL
-100
-1
6
0
database
a
1 100 -100 clickhouse
2 3 4 database
3 0 -1 a
4 0 -1 a
5 6 7 columns
6 0 -1 a
UPDATE DICTIONARY
118
VALUE FROM DISK
-100
clickhouse
VALUE FROM RAM BUFFER
8
VALUES FROM DISK AND RAM BUFFER
118
HAS
1006
VALUES NOT FROM TABLE
0 -1 none
0 -1 none
DUPLICATE KEYS
1 -100
2 4
3 -1
3 -1
2 4
1 -100
UPDATE DICTIONARY (MT)
118
VALUES FROM DISK AND RAM BUFFER (MT)
118

View File

@ -0,0 +1,157 @@
SET send_logs_level = 'none';
DROP DATABASE IF EXISTS database_for_dict;
CREATE DATABASE database_for_dict Engine = Ordinary;
DROP TABLE IF EXISTS database_for_dict.table_for_dict;
CREATE TABLE database_for_dict.table_for_dict
(
id UInt64,
a UInt64,
b Int32,
c String
)
ENGINE = MergeTree()
ORDER BY id;
INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, -100, 'clickhouse'), (2, 3, 4, 'database'), (5, 6, 7, 'columns'), (10, 9, 8, '');
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'a' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'b' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370, 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'c' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 700, 370;
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
CREATE DICTIONARY database_for_dict.ssd_dict
(
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1,
c String DEFAULT 'none'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/0d'));
SELECT 'TEST_SMALL';
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(4));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(5));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(6));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(2));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(3));
SELECT * FROM database_for_dict.ssd_dict ORDER BY id;
DROP DICTIONARY database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;
CREATE TABLE database_for_dict.keys_table
(
id UInt64
)
ENGINE = StripeLog();
INSERT INTO database_for_dict.keys_table VALUES (1);
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370;
INSERT INTO database_for_dict.keys_table VALUES (2);
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.keys_table VALUES (5);
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.keys_table VALUES (10);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
CREATE DICTIONARY database_for_dict.ssd_dict
(
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1,
c String DEFAULT 'none'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096 MAX_STORED_KEYS 1000000));
SELECT 'UPDATE DICTIONARY';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;
SELECT 'VALUE FROM DISK';
-- -100
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1));
-- 'clickhouse'
SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1));
SELECT 'VALUE FROM RAM BUFFER';
-- 8
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(10));
-- ''
SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(10));
SELECT 'VALUES FROM DISK AND RAM BUFFER';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;
SELECT 'HAS';
-- 1006
SELECT count() FROM database_for_dict.keys_table WHERE dictHas('database_for_dict.ssd_dict', toUInt64(id));
SELECT 'VALUES NOT FROM TABLE';
-- 0 -1 none
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(1000000)), dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1000000)), dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1000000));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(1000000)), dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1000000)), dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1000000));
SELECT 'DUPLICATE KEYS';
SELECT arrayJoin([1, 2, 3, 3, 2, 1]) AS id, dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(id));
--SELECT
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;
CREATE TABLE database_for_dict.keys_table
(
id UInt64
)
ENGINE = MergeTree()
ORDER BY id;
INSERT INTO database_for_dict.keys_table VALUES (1);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 370;
INSERT INTO database_for_dict.keys_table VALUES (2);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.keys_table VALUES (5);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.keys_table VALUES (10);
OPTIMIZE TABLE database_for_dict.keys_table;
CREATE DICTIONARY database_for_dict.ssd_dict
(
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/2d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 1024 MAX_STORED_KEYS 10));
SELECT 'UPDATE DICTIONARY (MT)';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;
SELECT 'VALUES FROM DISK AND RAM BUFFER (MT)';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.table_for_dict;
DROP DATABASE IF EXISTS database_for_dict;

View File

@ -0,0 +1,39 @@
TEST_SMALL
VALUE FROM RAM BUFFER
100
-100
clickhouse
100
-100
clickhouse
3
4
database
6
7
columns
9
8
UPDATE DICTIONARY
118
VALUE FROM DISK
-100
clickhouse
VALUE FROM RAM BUFFER
8
VALUES FROM DISK AND RAM BUFFER
118
HAS
6
VALUES NOT FROM TABLE
0 -1 none
0 -1 none
DUPLICATE KEYS
('1',3) -100
('2',-1) 4
('',0) -1
('',0) -1
('2',-1) 4
('1',3) -100

View File

@ -0,0 +1,133 @@
SET send_logs_level = 'none';
DROP DATABASE IF EXISTS database_for_dict;
CREATE DATABASE database_for_dict Engine = Ordinary;
DROP TABLE IF EXISTS database_for_dict.table_for_dict;
CREATE TABLE database_for_dict.table_for_dict
(
k1 String,
k2 Int32,
a UInt64,
b Int32,
c String
)
ENGINE = MergeTree()
ORDER BY (k1, k2);
INSERT INTO database_for_dict.table_for_dict VALUES (toString(1), 3, 100, -100, 'clickhouse'), (toString(2), -1, 3, 4, 'database'), (toString(5), -3, 6, 7, 'columns'), (toString(10), -20, 9, 8, '');
INSERT INTO database_for_dict.table_for_dict SELECT toString(number), number + 1, 0, -1, 'a' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370;
INSERT INTO database_for_dict.table_for_dict SELECT toString(number), number + 10, 0, -1, 'b' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370, 370;
INSERT INTO database_for_dict.table_for_dict SELECT toString(number), number + 100, 0, -1, 'c' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 700, 370;
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
CREATE DICTIONARY database_for_dict.ssd_dict
(
k1 String,
k2 Int32,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1,
c String DEFAULT 'none'
)
PRIMARY KEY k1, k2
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/0d'));
SELECT 'TEST_SMALL';
SELECT 'VALUE FROM RAM BUFFER';
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('1', toInt32(3)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('1', toInt32(3)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('1', toInt32(3)));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('1', toInt32(3)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('1', toInt32(3)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('1', toInt32(3)));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('2', toInt32(-1)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('2', toInt32(-1)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('2', toInt32(-1)));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('5', toInt32(-3)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('5', toInt32(-3)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('5', toInt32(-3)));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', tuple('10', toInt32(-20)));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', tuple('10', toInt32(-20)));
SELECT dictGetString('database_for_dict.ssd_dict', 'c', tuple('10', toInt32(-20)));
DROP DICTIONARY database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;
CREATE TABLE database_for_dict.keys_table
(
k1 String,
k2 Int32
)
ENGINE = StripeLog();
INSERT INTO database_for_dict.keys_table VALUES ('1', 3);
INSERT INTO database_for_dict.keys_table SELECT toString(intHash64(number + 1) % 1200), 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370;
INSERT INTO database_for_dict.keys_table VALUES ('2', -1);
INSERT INTO database_for_dict.keys_table SELECT toString(intHash64(number + 1) % 1200), 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.keys_table VALUES ('5', -3);
INSERT INTO database_for_dict.keys_table SELECT toString(intHash64(number + 1) % 1200), 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.keys_table VALUES ('10', -20);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
CREATE DICTIONARY database_for_dict.ssd_dict
(
k1 String,
k2 Int32,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1,
c String DEFAULT 'none'
)
PRIMARY KEY k1, k2
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096 MAX_STORED_KEYS 1000000));
SELECT 'UPDATE DICTIONARY';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', (k1, k2))) FROM database_for_dict.keys_table;
SELECT 'VALUE FROM DISK';
-- -100
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', ('1', toInt32(3)));
-- 'clickhouse'
SELECT dictGetString('database_for_dict.ssd_dict', 'c', ('1', toInt32(3)));
SELECT 'VALUE FROM RAM BUFFER';
-- 8
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', ('10', toInt32(-20)));
-- ''
SELECT dictGetString('database_for_dict.ssd_dict', 'c', ('10', toInt32(-20)));
SELECT 'VALUES FROM DISK AND RAM BUFFER';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', (k1, k2))) FROM database_for_dict.keys_table;
SELECT 'HAS';
-- 6
SELECT count() FROM database_for_dict.keys_table WHERE dictHas('database_for_dict.ssd_dict', (k1, k2));
SELECT 'VALUES NOT FROM TABLE';
-- 0 -1 none
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', ('unknown', toInt32(0))), dictGetInt32('database_for_dict.ssd_dict', 'b', ('unknown', toInt32(0))), dictGetString('database_for_dict.ssd_dict', 'c', ('unknown', toInt32(0)));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', ('unknown', toInt32(0))), dictGetInt32('database_for_dict.ssd_dict', 'b', ('unknown', toInt32(0))), dictGetString('database_for_dict.ssd_dict', 'c', ('unknown', toInt32(0)));
SELECT 'DUPLICATE KEYS';
SELECT arrayJoin([('1', toInt32(3)), ('2', toInt32(-1)), ('', toInt32(0)), ('', toInt32(0)), ('2', toInt32(-1)), ('1', toInt32(3))]) AS keys, dictGetInt32('database_for_dict.ssd_dict', 'b', keys);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;