Updated CacheDictionary

This commit is contained in:
Maksim Kita 2021-02-27 19:04:32 +03:00
parent ef1645b546
commit ae88bbda5a
8 changed files with 121 additions and 119 deletions

View File

@ -32,6 +32,7 @@ endif()
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
add_subdirectory (abseil-cpp-cmake)
add_subdirectory (antlr4-runtime-cmake)
add_subdirectory (boost-cmake)
add_subdirectory (cctz-cmake)

View File

@ -0,0 +1,18 @@
set(ABSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/abseil-cpp")
if(NOT EXISTS "${ABSL_ROOT_DIR}/CMakeLists.txt")
message(FATAL_ERROR " submodule third_party/abseil-cpp is missing. To fix try run: \n git submodule update --init --recursive")
endif()
add_subdirectory("${ABSL_ROOT_DIR}" "${ClickHouse_BINARY_DIR}/contrib/abseil-cpp")
add_library(abseil_swiss_tables INTERFACE)
target_link_libraries(abseil_swiss_tables INTERFACE
absl::flat_hash_map
absl::flat_hash_set
)
get_target_property(FLAT_HASH_MAP_INCLUDE_DIR absl::flat_hash_map INTERFACE_INCLUDE_DIRECTORIES)
target_include_directories (abseil_swiss_tables SYSTEM BEFORE INTERFACE ${FLAT_HASH_MAP_INCLUDE_DIR})
get_target_property(FLAT_HASH_SET_INCLUDE_DIR absl::flat_hash_set INTERFACE_INCLUDE_DIRECTORIES)
target_include_directories (abseil_swiss_tables SYSTEM BEFORE INTERFACE ${FLAT_HASH_SET_INCLUDE_DIR})

View File

@ -39,11 +39,6 @@ set(_gRPC_SSL_LIBRARIES ${OPENSSL_LIBRARIES})
# Use abseil-cpp from ClickHouse contrib, not from gRPC third_party.
set(gRPC_ABSL_PROVIDER "clickhouse" CACHE STRING "" FORCE)
set(ABSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/abseil-cpp")
if(NOT EXISTS "${ABSL_ROOT_DIR}/CMakeLists.txt")
message(FATAL_ERROR " grpc: submodule third_party/abseil-cpp is missing. To fix try run: \n git submodule update --init --recursive")
endif()
add_subdirectory("${ABSL_ROOT_DIR}" "${ClickHouse_BINARY_DIR}/contrib/abseil-cpp")
# Choose to build static or shared library for c-ares.
if (MAKE_STATIC_LIBRARIES)

View File

@ -18,6 +18,7 @@ target_link_libraries(clickhouse_dictionaries
Poco::MongoDB
Poco::Redis
string_utils
abseil_swiss_tables
)
if(USE_CASSANDRA)

View File

@ -322,8 +322,7 @@ Columns CacheDictionary<dictionary_key_type>::getColumnsImpl(
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto fetch_result = cache_storage_ptr->fetchColumnsForKeys(keys, request);
result_of_fetch_from_storage = std::move(fetch_result);
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
}
size_t found_keys_size = result_of_fetch_from_storage.found_keys_size;
@ -806,10 +805,10 @@ namespace
config.getUInt64(dictionary_configuration_prefix + "strict_max_lifetime_seconds",
static_cast<size_t>(dict_lifetime.max_sec));
// size_t rounded_size = roundUpToPowerOfTwoOrZero(size);
size_t rounded_size = roundUpToPowerOfTwoOrZero(size);
CacheDictionaryStorageConfiguration storage_configuration {
size,
rounded_size,
strict_max_lifetime_seconds,
dict_lifetime
};

View File

@ -72,12 +72,10 @@ public:
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
SimpleKeysStorageFetchResult result;
fetchColumnsForKeysImpl(keys, fetch_request, result);
return result;
return fetchColumnsForKeysImpl<SimpleKeysStorageFetchResult>(keys, fetch_request);
}
else
throw Exception("Method insertColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method fetchColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertColumnsForKeys(const PaddedPODArray<UInt64> & keys, Columns columns) override
@ -104,9 +102,7 @@ public:
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
ComplexKeysStorageFetchResult result;
fetchColumnsForKeysImpl(keys, column_fetch_requests, result);
return result;
return fetchColumnsForKeysImpl<ComplexKeysStorageFetchResult>(keys, column_fetch_requests);
}
else
throw Exception("Method fetchColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
@ -125,7 +121,7 @@ public:
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
return getCachedKeysImpl();
else
throw Exception("Method getCachedSimpleKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method getCachedComplexKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
size_t getSize() const override { return cache.size(); }
@ -137,11 +133,12 @@ public:
private:
template <typename KeysStorageFetchResult>
void fetchColumnsForKeysImpl(
ALWAYS_INLINE KeysStorageFetchResult fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & fetch_request,
KeysStorageFetchResult & result)
const DictionaryStorageFetchRequest & fetch_request)
{
KeysStorageFetchResult result;
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
@ -149,7 +146,11 @@ private:
size_t fetched_columns_index = 0;
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
auto * it = cache.find(key);
@ -161,7 +162,7 @@ private:
bool has_deadline = cellHasDeadline(cell);
if (has_deadline && now > cell.deadline + std::chrono::seconds(configuration.strict_max_lifetime_seconds))
if (has_deadline && now > cell.deadline + max_lifetime_seconds)
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
@ -189,6 +190,8 @@ private:
++result.not_found_keys_size;
}
}
return result;
}
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)

View File

@ -116,7 +116,9 @@ static inline void deserializeAndInsertIntoColumns(
const DictionaryStorageFetchRequest & fetch_request,
const char * place_for_serialized_columns)
{
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
size_t columns_size = columns.size();
for (size_t column_index = 0; column_index < columns_size; ++column_index)
{
const auto & column = columns[column_index];

View File

@ -8,8 +8,11 @@
#include <filesystem>
#include <city.h>
#include <fcntl.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <common/unaligned.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
@ -20,7 +23,6 @@
#include <Dictionaries/ICacheDictionaryStorage.h>
#include <Dictionaries/DictionaryHelpers.h>
namespace ProfileEvents
{
extern const Event FileOpen;
@ -124,7 +126,7 @@ public:
/// Reset block with new block_data
/// block_data must be filled with zeroes if it is new block
void reset(char * new_block_data)
ALWAYS_INLINE inline void reset(char * new_block_data)
{
block_data = new_block_data;
current_block_offset = block_header_size;
@ -132,12 +134,12 @@ public:
}
/// Check if it is enough place to write key in block
bool enoughtPlaceToWriteKey(const SSDCacheSimpleKey & cache_key) const
ALWAYS_INLINE inline bool enoughtPlaceToWriteKey(const SSDCacheSimpleKey & cache_key) const
{
return (current_block_offset + (sizeof(cache_key.key) + sizeof(cache_key.size) + cache_key.size)) <= block_size;
}
bool enoughtPlaceToWriteKey(const SSDCacheComplexKey & cache_key) const
ALWAYS_INLINE inline bool enoughtPlaceToWriteKey(const SSDCacheComplexKey & cache_key) const
{
const StringRef & key = cache_key.key;
size_t complex_key_size = sizeof(key.size) + key.size;
@ -148,7 +150,7 @@ public:
/// Write key and returns offset in ssd cache block where data is written
/// It is client responsibility to check if there is enough place in block to write key
/// Returns true if key was written and false if there was not enough place to write key
bool writeKey(const SSDCacheSimpleKey & cache_key, size_t & offset_in_block)
ALWAYS_INLINE inline bool writeKey(const SSDCacheSimpleKey & cache_key, size_t & offset_in_block)
{
assert(cache_key.size > 0);
@ -177,7 +179,7 @@ public:
return true;
}
bool writeKey(const SSDCacheComplexKey & cache_key, size_t & offset_in_block)
ALWAYS_INLINE inline bool writeKey(const SSDCacheComplexKey & cache_key, size_t & offset_in_block)
{
assert(cache_key.size > 0);
@ -212,20 +214,20 @@ public:
return true;
}
inline size_t getKeysSize() const { return keys_size; }
ALWAYS_INLINE inline size_t getKeysSize() const { return keys_size; }
/// Write keys size into block header
inline void writeKeysSize()
ALWAYS_INLINE inline void writeKeysSize()
{
char * keys_size_offset_data = block_data + block_header_check_sum_size;
std::memcpy(keys_size_offset_data, &keys_size, sizeof(size_t));
}
/// Get check sum from block header
inline size_t getCheckSum() const { return unalignedLoad<size_t>(block_data); }
ALWAYS_INLINE inline size_t getCheckSum() const { return unalignedLoad<size_t>(block_data); }
/// Calculate check sum in block
inline size_t calculateCheckSum() const
ALWAYS_INLINE inline size_t calculateCheckSum() const
{
size_t calculated_check_sum = static_cast<size_t>(CityHash_v1_0_2::CityHash64(block_data + block_header_check_sum_size, block_size - block_header_check_sum_size));
@ -233,7 +235,7 @@ public:
}
/// Check if check sum from block header matched calculated check sum in block
inline bool checkCheckSum() const
ALWAYS_INLINE inline bool checkCheckSum() const
{
size_t calculated_check_sum = calculateCheckSum();
size_t check_sum = getCheckSum();
@ -242,16 +244,16 @@ public:
}
/// Write check sum in block header
inline void writeCheckSum()
ALWAYS_INLINE inline void writeCheckSum()
{
size_t check_sum = static_cast<size_t>(CityHash_v1_0_2::CityHash64(block_data + block_header_check_sum_size, block_size - block_header_check_sum_size));
std::memcpy(block_data, &check_sum, sizeof(size_t));
}
inline size_t getBlockSize() const { return block_size; }
ALWAYS_INLINE inline size_t getBlockSize() const { return block_size; }
/// Returns block data
inline char * getBlockData() const { return block_data; }
ALWAYS_INLINE inline char * getBlockData() const { return block_data; }
/// Read keys that were serialized in block
/// It is client responsibility to ensure that simple or complex keys were written in block
@ -334,14 +336,14 @@ inline bool operator==(const SSDCacheIndex & lhs, const SSDCacheIndex & rhs)
}
template <typename SSDCacheKeyType>
class SSDCacheMemoryBufferPartition
class SSDCacheMemoryBuffer
{
public:
using KeyType = typename SSDCacheKeyType::KeyType;
explicit SSDCacheMemoryBufferPartition(size_t block_size_, size_t partition_blocks_size_)
explicit SSDCacheMemoryBuffer(size_t block_size_, size_t memory_buffer_blocks_size_)
: block_size(block_size_)
, partition_blocks_size(partition_blocks_size_)
, partition_blocks_size(memory_buffer_blocks_size_)
, buffer(block_size * partition_blocks_size, 4096)
, current_write_block(block_size)
{
@ -414,7 +416,7 @@ public:
}
}
inline void resetToPartitionStart()
inline void reset()
{
current_block_index = 0;
current_write_block.reset(buffer.m_data);
@ -444,12 +446,10 @@ public:
explicit SSDCacheFileBuffer(
const std::string & file_path_,
size_t block_size_,
size_t file_blocks_size_,
size_t read_from_file_buffer_blocks_size_)
size_t file_blocks_size_)
: file_path(file_path_ + BIN_FILE_EXT)
, block_size(block_size_)
, file_blocks_size(file_blocks_size_)
, read_from_file_buffer_blocks_size(read_from_file_buffer_blocks_size_)
{
auto path = std::filesystem::path{file_path};
auto parent_path_directory = path.parent_path();
@ -603,25 +603,24 @@ public:
}
template <typename FetchBlockFunc>
void fetchBlocks(const PaddedPODArray<size_t> & blocks_to_fetch, FetchBlockFunc && func) const
ALWAYS_INLINE void fetchBlocks(char * read_buffer, size_t read_from_file_buffer_blocks_size, const PaddedPODArray<size_t> & blocks_to_fetch, FetchBlockFunc && func) const
{
if (blocks_to_fetch.empty())
return;
/// TODO: Probably make sense to pass it as parameter
Memory read_buffer(block_size * read_from_file_buffer_blocks_size, 4096);
size_t blocks_to_fetch_size = blocks_to_fetch.size();
PaddedPODArray<iocb> requests;
PaddedPODArray<iocb *> pointers;
requests.reserve(blocks_to_fetch.size());
pointers.reserve(blocks_to_fetch.size());
requests.reserve(blocks_to_fetch_size);
pointers.reserve(blocks_to_fetch_size);
for (size_t block_to_fetch_index = 0; block_to_fetch_index < blocks_to_fetch.size(); ++block_to_fetch_index)
for (size_t block_to_fetch_index = 0; block_to_fetch_index < blocks_to_fetch_size; ++block_to_fetch_index)
{
iocb request{};
char * buffer_place = read_buffer.data() + block_size * (block_to_fetch_index % read_from_file_buffer_blocks_size);
char * buffer_place = read_buffer + block_size * (block_to_fetch_index % read_from_file_buffer_blocks_size);
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
@ -740,7 +739,7 @@ private:
int fd = -1;
};
static int preallocateDiskSpace(int fd, size_t offset, size_t len)
ALWAYS_INLINE inline static int preallocateDiskSpace(int fd, size_t offset, size_t len)
{
#if defined(__FreeBSD__)
return posix_fallocate(fd, offset, len);
@ -749,7 +748,7 @@ private:
#endif
}
static char * getRequestBuffer(const iocb & request)
ALWAYS_INLINE inline static char * getRequestBuffer(const iocb & request)
{
char * result = nullptr;
@ -762,7 +761,7 @@ private:
return result;
}
static ssize_t eventResult(io_event & event)
ALWAYS_INLINE inline static ssize_t eventResult(io_event & event)
{
ssize_t bytes_written;
@ -778,7 +777,6 @@ private:
String file_path;
size_t block_size;
size_t file_blocks_size;
size_t read_from_file_buffer_blocks_size;
FileDescriptor file;
size_t current_block_index = 0;
@ -809,7 +807,8 @@ public:
explicit SSDCacheDictionaryStorage(const SSDCacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, file_buffer(configuration_.file_path, configuration.block_size, configuration.file_blocks_size, configuration.read_buffer_blocks_size)
, file_buffer(configuration_.file_path, configuration.block_size, configuration.file_blocks_size)
, read_from_file_buffer(configuration_.block_size * configuration_.read_buffer_blocks_size, 4096)
, rnd_engine(randomSeed())
, index(configuration.max_stored_keys, false, ArenaCellKeyDisposer<SSDCacheDictionaryStorage<dictionary_key_type>> { *this })
{
@ -833,11 +832,7 @@ public:
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
SimpleKeysStorageFetchResult result;
fetchColumnsForKeysImpl(keys, fetch_request, result);
return result;
}
return fetchColumnsForKeysImpl<SimpleKeysStorageFetchResult>(keys, fetch_request);
else
throw Exception("Method insertColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
@ -862,14 +857,10 @@ public:
ComplexKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<StringRef> & keys,
const DictionaryStorageFetchRequest & column_fetch_requests) override
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
ComplexKeysStorageFetchResult result;
fetchColumnsForKeysImpl(keys, column_fetch_requests, result);
return result;
}
return fetchColumnsForKeysImpl<ComplexKeysStorageFetchResult>(keys, fetch_request);
else
throw Exception("Method fetchColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
@ -911,8 +902,8 @@ private:
TimePoint deadline;
SSDCacheIndex index;
bool in_memory;
size_t in_memory_partition_index;
bool in_memory;
};
struct KeyToBlockOffset
@ -921,19 +912,18 @@ private:
: key_index(key_index_), offset_in_block(offset_in_block_), is_expired(is_expired_)
{}
const size_t key_index;
const size_t offset_in_block;
const bool is_expired;
size_t key_index;
size_t offset_in_block;
bool is_expired;
};
using BlockIndexToKeysMap = std::unordered_map<size_t, PaddedPODArray<KeyToBlockOffset>>;
template <typename Result>
void fetchColumnsForKeysImpl(
Result fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & fetch_request,
Result & result) const
const DictionaryStorageFetchRequest & fetch_request) const
{
Result result;
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
@ -941,10 +931,15 @@ private:
size_t fetched_columns_index = 0;
using BlockIndexToKeysMap = absl::flat_hash_map<size_t, PaddedPODArray<KeyToBlockOffset>, DefaultHash<size_t>>;
BlockIndexToKeysMap block_to_keys_map;
std::unordered_set<size_t> unique_blocks_to_request;
absl::flat_hash_set<size_t, DefaultHash<size_t>> unique_blocks_to_request;
PaddedPODArray<size_t> blocks_to_request;
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
std::chrono::seconds strict_max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
@ -956,68 +951,52 @@ private:
bool has_deadline = cellHasDeadline(cell);
if (has_deadline && now > cell.deadline + std::chrono::seconds(configuration.strict_max_lifetime_seconds))
if (has_deadline && now > cell.deadline + strict_max_lifetime_seconds)
{
result.key_index_to_state[key_index] = KeyState::not_found;
++result.not_found_keys_size;
continue;
}
else if (has_deadline && now > cell.deadline)
bool cell_is_expired = false;
KeyState::State key_state = KeyState::found;
if (has_deadline && now > cell.deadline)
{
if (cell.in_memory)
{
result.key_index_to_state[key_index] = { KeyState::expired, fetched_columns_index };
++fetched_columns_index;
cell_is_expired = true;
key_state = KeyState::expired;
}
const auto & partition = memory_buffer_partitions[cell.in_memory_partition_index];
char * serialized_columns_place = partition.getPlace(cell.index);
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, serialized_columns_place);
result.expired_keys_size += cell_is_expired;
result.found_keys_size += !cell_is_expired;
++result.expired_keys_size;
}
else
{
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, true);
unique_blocks_to_request.insert(cell.index.block_index);
}
if (cell.in_memory)
{
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
++fetched_columns_index;
const auto & partition = memory_buffer_partitions[cell.in_memory_partition_index];
char * serialized_columns_place = partition.getPlace(cell.index);
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, serialized_columns_place);
}
else
{
if (cell.in_memory)
{
result.key_index_to_state[key_index] = { KeyState::found, fetched_columns_index };
++fetched_columns_index;
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, cell_is_expired);
const auto & partition = memory_buffer_partitions[cell.in_memory_partition_index];
char * serialized_columns_place = partition.getPlace(cell.index);
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, serialized_columns_place);
++result.found_keys_size;
}
else
if (!unique_blocks_to_request.contains(cell.index.block_index))
{
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, false);
blocks_to_request.emplace_back(cell.index.block_index);
unique_blocks_to_request.insert(cell.index.block_index);
}
}
}
else
{
result.key_index_to_state[key_index] = KeyState::not_found;
++result.not_found_keys_size;
}
}
PaddedPODArray<size_t> blocks_to_request;
blocks_to_request.reserve(unique_blocks_to_request.size());
for (auto block : unique_blocks_to_request)
blocks_to_request.emplace_back(block);
/// Sort blocks by offset before start async io requests
std::sort(blocks_to_request.begin(), blocks_to_request.end());
file_buffer.fetchBlocks(blocks_to_request, [&](size_t block_index, char * block_data)
file_buffer.fetchBlocks(read_from_file_buffer.m_data, configuration.read_buffer_blocks_size, blocks_to_request, [&](size_t block_index, char * block_data)
{
PaddedPODArray<KeyToBlockOffset> & keys_in_block = block_to_keys_map[block_index];
@ -1034,6 +1013,8 @@ private:
++fetched_columns_index;
}
});
return result;
}
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)
@ -1177,7 +1158,7 @@ private:
}
/// Memory buffer partition flushed to disk start reusing it
current_memory_buffer_partition.resetToPartitionStart();
current_memory_buffer_partition.reset();
memset(const_cast<char *>(current_memory_buffer_partition.getData()), 0, current_memory_buffer_partition.getSizeInBytes());
write_into_memory_buffer_result = current_memory_buffer_partition.writeKey(ssd_cache_key, cache_index);
@ -1240,7 +1221,9 @@ private:
SSDCacheFileBuffer<SSDCacheKeyType> file_buffer;
std::vector<SSDCacheMemoryBufferPartition<SSDCacheKeyType>> memory_buffer_partitions;
Memory<Allocator<false>> read_from_file_buffer;
std::vector<SSDCacheMemoryBuffer<SSDCacheKeyType>> memory_buffer_partitions;
pcg64 rnd_engine;