new hashtables

This commit is contained in:
Nikita Vasilev 2020-05-10 20:01:02 +03:00
parent 05dcc1e2fd
commit 833637d910
4 changed files with 253 additions and 182 deletions

View File

@ -0,0 +1,209 @@
#pragma once
#include <Common/HashTable/Hash.h>
#include <common/logger_useful.h>
#include <type_traits>
#include <vector>
namespace DB
{
namespace
{
size_t nearestPowTwo(size_t x) {
size_t r = 8;
while (x > r) {
r <<= 1;
}
return r;
}
}
struct EmptyDeleter {};
struct Int64Hasher
{
size_t operator()(const size_t x) const
{
return intHash64(x);
}
};
template <typename K, typename V, typename Hasher, typename Deleter = EmptyDeleter>
class BucketCacheIndex
{
struct Cell {
K key;
V index;
};
public:
template <typename = std::enable_if<std::is_same_v<EmptyDeleter, Deleter>>>
BucketCacheIndex(size_t cells_)
: buckets(nearestPowTwo(cells_) / bucket_size)
, bucket_mask(buckets - 1)
, cells(buckets * bucket_size)
, positions((buckets / 2) + 1)
{
for (auto & cell : cells)
cell.index.setNotExists();
for (size_t bucket = 0; bucket < buckets; ++bucket)
setPosition(bucket, 0);
}
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
BucketCacheIndex(size_t cells_, Deleter deleter_)
: deleter(deleter_)
, buckets(nearestPowTwo(cells_) / bucket_size)
, bucket_mask(buckets - 1)
, cells(buckets * bucket_size)
, positions((buckets / 2) + 1)
{
for (auto & cell : cells)
cell.index.setNotExists();
for (size_t bucket = 0; bucket < buckets; ++bucket)
setPosition(bucket, 0);
}
void set(K key, V val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists())
{
incPosition(bucket);
++sz;
}
cells[idx].key = key;
cells[idx].index = val;
}
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
void setWithDelete(K key, V val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists())
{
incPosition(bucket);
++sz;
}
else
{
deleter(cells[idx].key);
}
cells[idx].key = key;
cells[idx].index = val;
}
bool get(K key, V & val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
val = cells[idx].index;
return true;
}
bool getKeyAndValue(K & key, V & val)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
key = cells[idx].key;
val = cells[idx].index;
return true;
}
bool erase(K key)
{
const size_t bucket = (hash(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
cells[idx].index.setNotExists();
--sz;
if constexpr (!std::is_same_v<EmptyDeleter, Deleter>)
deleter(cells[idx].key);
return true;
}
size_t size()
{
return sz;
}
auto keys()
{
std::vector<K> res;
for (const auto & cell : cells)
{
if (cell.index.exists())
{
res.push_back(cell.key);
}
}
return res;
}
private:
size_t getCellIndex(const K key, const size_t bucket)
{
const size_t pos = getPosition(bucket);
for (size_t idx = 0; idx < bucket_size; ++idx)
{
const size_t cur = ((pos + 1 + idx) & pos_mask);
if (cells[bucket * bucket_size + cur].index.exists() &&
cells[bucket * bucket_size + cur].key == key)
{
return bucket * bucket_size + cur;
}
}
return bucket * bucket_size + pos;
}
size_t getPosition(const size_t bucket)
{
const size_t idx = (bucket >> 1);
if ((bucket & 1) == 0)
return ((positions[idx] >> 4) & pos_mask);
return (positions[idx] & pos_mask);
}
void setPosition(const size_t bucket, const size_t pos)
{
const size_t idx = bucket >> 1;
if ((bucket & 1) == 0)
positions[idx] = ((pos << 4) | (positions[idx] & ((1 << 4) - 1)));
else
positions[idx] = (pos | (positions[idx] & (((1 << 4) - 1) << 4)));
}
void incPosition(const size_t bucket)
{
setPosition(bucket, (getPosition(bucket) + 1) & pos_mask);
}
static constexpr size_t bucket_size = 8;
static constexpr size_t pos_size = 3;
static constexpr size_t pos_mask = (1 << pos_size) - 1;
Hasher hash;
Deleter deleter;
size_t buckets;
size_t bucket_mask;
std::vector<Cell> cells;
std::vector<char> positions;
size_t sz = 0;
};
}

View File

@ -9,10 +9,10 @@
#include <Columns/ColumnString.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <Common/HashTable/Hash.h>
#include <common/logger_useful.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/Block.h>
#include <Dictionaries/BucketCache.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferAIO.h>
#include <list>
@ -24,142 +24,6 @@
namespace DB
{
namespace
{
size_t nearestPowTwo(size_t x) {
size_t r = 1;
while (x > r) {
r <<= 1;
}
return r;
}
}
template <typename K, typename V>
class CLRUCache
{
struct Cell {
K key;
V index;
};
public:
CLRUCache(size_t cells_)
: buckets(nearestPowTwo(cells_) / bucket_size)
, bucket_mask(buckets - 1)
, cells(buckets * bucket_size)
, positions((buckets / 2) + 1)
{
Poco::Logger::get("cache").information(" buckets: " + std::to_string(buckets) + " cells: " + std::to_string(cells.size()));
for (auto & cell : cells)
cell.index.setNotExists();
for (size_t bucket = 0; bucket < buckets; ++bucket)
setPosition(bucket, 0);
}
void set(K key, V val)
{
const size_t bucket = (intHash64(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists())
{
incPosition(bucket);
++sz;
}
cells[idx].key = key;
cells[idx].index = val;
}
bool get(K key, V & val)
{
const size_t bucket = (intHash64(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
val = cells[idx].index;
return true;
}
bool erase(K key)
{
const size_t bucket = (intHash64(key) & bucket_mask);
const size_t idx = getCellIndex(key, bucket);
if (!cells[idx].index.exists() || cells[idx].key != key)
return false;
cells[idx].index.setNotExists();
--sz;
return true;
}
size_t size()
{
return sz;
}
auto keys()
{
std::vector<K> res;
for (const auto & cell : cells)
{
if (cell.index.exists())
{
res.push_back(cell.key);
}
}
return res;
}
private:
size_t getCellIndex(const K key, const size_t bucket)
{
const size_t pos = getPosition(bucket);
for (size_t idx = 0; idx < bucket_size; ++idx)
{
const size_t cur = ((pos + 1 + idx) & pos_mask);
if (cells[bucket * bucket_size + cur].index.exists() &&
cells[bucket * bucket_size + cur].key == key)
{
return bucket * bucket_size + cur;
}
}
return bucket * bucket_size + pos;
}
size_t getPosition(const size_t bucket)
{
const size_t idx = (bucket >> 1);
if ((bucket & 1) == 0)
return ((positions[idx] >> 4) & pos_mask);
return (positions[idx] & pos_mask);
}
void setPosition(const size_t bucket, const size_t pos)
{
const size_t idx = bucket >> 1;
if ((bucket & 1) == 0)
positions[idx] = ((pos << 4) | (positions[idx] & ((1 << 4) - 1)));
else
positions[idx] = (pos | (positions[idx] & (((1 << 4) - 1) << 4)));
}
void incPosition(const size_t bucket)
{
setPosition(bucket, (getPosition(bucket) + 1) & pos_mask);
}
static constexpr size_t bucket_size = 8;
static constexpr size_t pos_size = 3;
static constexpr size_t pos_mask = (1 << pos_size) - 1;
size_t buckets;
size_t bucket_mask;
std::vector<Cell> cells;
std::vector<char> positions;
size_t sz = 0;
};
using AttributeValueVariant = std::variant<
UInt8,
UInt16,
@ -316,7 +180,7 @@ private:
int fd = -1;
mutable CLRUCache<UInt64, Index> key_to_index;
mutable BucketCacheIndex<UInt64, Index, Int64Hasher> key_to_index;
Attribute keys_buffer;
const std::vector<AttributeUnderlyingType> attributes_structure;

View File

@ -182,7 +182,7 @@ SSDComplexKeyCachePartition::SSDComplexKeyCachePartition(
, write_buffer_size(write_buffer_size_)
, max_stored_keys(max_stored_keys_)
, path(dir_path + "/" + std::to_string(file_id))
, key_to_index(max_stored_keys, keys_pool)
, key_to_index(max_stored_keys, KeyDeleter(keys_pool))
, attributes_structure(attributes_structure_)
{
std::filesystem::create_directories(std::filesystem::path{dir_path});
@ -363,7 +363,7 @@ size_t SSDComplexKeyCachePartition::append(
if (!flushed)
{
key_to_index.set(keys[index], cache_index);
key_to_index.setWithDelete(keys[index], cache_index);
keys_buffer.push_back(keys_buffer_pool->copyKeyFrom(keys[index]));
++index;
++keys_in_block;
@ -447,7 +447,7 @@ void SSDComplexKeyCachePartition::flush()
{
Index index;
Poco::Logger::get("get:").information("sz = " + std::to_string(keys_buffer[row].size()));
if (key_to_index.get(keys_buffer[row], index))
if (key_to_index.getKeyAndValue(keys_buffer[row], index))
{
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
{
@ -910,8 +910,8 @@ PaddedPODArray<KeyRef> SSDComplexKeyCachePartition::getCachedIds(const std::chro
{
std::unique_lock lock(rw_lock); // Begin and end iterators can be changed.
PaddedPODArray<KeyRef> array;
for (const auto & [key, index] : key_to_index)
array.push_back(key); // TODO: exclude default
//for (const auto & [key, index] : key_to_index)
//array.push_back(key); // TODO: exclude default
return array;
}

View File

@ -14,6 +14,7 @@
#include <Common/SmallObjectPool.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/Block.h>
#include <Dictionaries/BucketCache.h>
#include <ext/scope_guard.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteBufferAIO.h>
@ -76,6 +77,11 @@ public:
return getRef() == other.getRef();
}
inline bool operator!=(const KeyRef & other) const
{
return !(*this == other);
}
inline bool operator<(const KeyRef & other) const
{
return getRef() < other.getRef();
@ -128,22 +134,7 @@ class ComplexKeysPoolImpl
public:
KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys)
{
if constexpr (std::is_same_v<A, SmallObjectPool>)
{
// not working now
const auto res = arena->alloc();
auto place = res;
for (const auto & key_column : key_columns)
{
const StringRef key = key_column->getDataAt(row);
memcpy(place, key.data, key.size);
place += key.size;
}
return KeyRef(res);
}
else
if constexpr (!std::is_same_v<A, SmallObjectPool>)
{
const auto keys_size = key_columns.size();
UInt16 sum_keys_size{};
@ -165,7 +156,6 @@ public:
{
if (!key_columns[j]->valuesHaveFixedSize()) // String
{
//auto start = key_start;
auto key_size = keys[j].size + 1;
memcpy(key_start, &key_size, sizeof(size_t));
key_start += sizeof(size_t);
@ -173,26 +163,36 @@ public:
key_start += keys[j].size;
*key_start = '\0';
++key_start;
//keys[j].data = start;
//keys[j].size += sizeof(size_t) + 1;
}
else
{
memcpy(key_start, keys[j].data, keys[j].size);
//keys[j].data = key_start;
key_start += keys[j].size;
}
}
return KeyRef(place);
}
else
{
// not working now
const auto res = arena->alloc();
auto place = res;
for (const auto & key_column : key_columns)
{
const StringRef key = key_column->getDataAt(row);
memcpy(place, key.data, key.size);
place += key.size;
}
return KeyRef(res);
}
}
KeyRef copyKeyFrom(const KeyRef & key)
{
//Poco::Logger::get("test cpy").information("--- --- --- ");
char * data = arena.alloc(key.fullSize());
//Poco::Logger::get("test cpy").information("--- --- --- finish");
memcpy(data, key.fullData(), key.fullSize());
return KeyRef(data);
}
@ -201,18 +201,14 @@ public:
{
if constexpr (std::is_same_v<A, ArenaWithFreeLists>)
arena.free(key.fullData(), key.fullSize());
else if constexpr (std::is_same_v<A, SmallObjectPool>)
arena.free(key.fullData());
//else
// throw Exception("Free not supported.", ErrorCodes::LOGICAL_ERROR);
/*else if constexpr (std::is_same_v<A, SmallObjectPool>)
arena.free(key.fullData());*/
}
void rollback(const KeyRef & key)
{
if constexpr (std::is_same_v<A, Arena>)
arena.rollback(key.fullSize());
//else
// throw Exception("Rollback not supported.", ErrorCodes::LOGICAL_ERROR);
}
void writeKey(const KeyRef & key, WriteBuffer & buf)
@ -249,7 +245,6 @@ private:
using TemporalComplexKeysPool = ComplexKeysPoolImpl<Arena>;
using ComplexKeysPool = ComplexKeysPoolImpl<ArenaWithFreeLists>;
//using FixedComplexKeysPool = ComplexKeysPoolImpl<SmallObjectPool>;
template <typename K, typename V, typename Pool>
class ComplexKeyLRUCache
@ -343,6 +338,18 @@ private:
std::mutex mutex;
};
struct KeyDeleter
{
KeyDeleter(ComplexKeysPool & keys_pool_) : keys_pool(keys_pool_) {}
void operator()(const KeyRef key) const
{
keys_pool.freeKey(key);
}
ComplexKeysPool & keys_pool;
};
class SSDComplexKeyCachePartition
{
public:
@ -487,14 +494,6 @@ private:
void ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const;
/*KeyRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys) const;
void freeKey(const KeyRef key) const;
void writeKey(KeyRef key, WriteBuffer & buf);
template <typename ArenaForKey>
void readKey(KeyRef & key, ArenaForKey & arena, ReadBuffer & buf);
void ignoreKey(ReadBuffer & buf);*/
const size_t file_id;
const size_t max_size;
const size_t block_size;
@ -508,7 +507,7 @@ private:
int fd = -1;
ComplexKeysPool keys_pool;
mutable ComplexKeyLRUCache<KeyRef, Index, ComplexKeysPool> key_to_index;
mutable BucketCacheIndex<KeyRef, Index, std::hash<KeyRef>, KeyDeleter> key_to_index;
std::optional<TemporalComplexKeysPool> keys_buffer_pool;
KeyRefs keys_buffer;
@ -518,7 +517,6 @@ private:
std::optional<Memory<>> memory;
std::optional<WriteBuffer> write_buffer;
uint32_t keys_in_block = 0;
//CompressionCodecPtr codec;
size_t current_memory_block_id = 0;
size_t current_file_block_id = 0;