2019-10-25 18:06:08 +00:00
|
|
|
#include "SSDCacheDictionary.h"
|
|
|
|
|
2020-01-03 19:52:07 +00:00
|
|
|
#include <algorithm>
|
2019-10-25 18:06:08 +00:00
|
|
|
#include <Columns/ColumnsNumber.h>
|
2020-01-01 17:40:46 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2020-01-02 19:33:19 +00:00
|
|
|
#include <Common/ProfileEvents.h>
|
|
|
|
#include <Common/ProfilingScopedRWLock.h>
|
2020-01-01 17:40:46 +00:00
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
2020-01-05 13:59:49 +00:00
|
|
|
#include "DictionaryFactory.h"
|
2020-01-06 20:38:32 +00:00
|
|
|
#include <IO/AIO.h>
|
|
|
|
#include <IO/ReadHelpers.h>
|
2020-01-04 15:04:16 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
2020-01-01 17:40:46 +00:00
|
|
|
#include <ext/chrono_io.h>
|
|
|
|
#include <ext/map.h>
|
|
|
|
#include <ext/range.h>
|
|
|
|
#include <ext/size.h>
|
2020-01-06 20:38:32 +00:00
|
|
|
#include <ext/bit_cast.h>
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-02 19:33:19 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event DictCacheKeysRequested;
|
|
|
|
extern const Event DictCacheKeysRequestedMiss;
|
|
|
|
extern const Event DictCacheKeysRequestedFound;
|
|
|
|
extern const Event DictCacheKeysExpired;
|
|
|
|
extern const Event DictCacheKeysNotFound;
|
|
|
|
extern const Event DictCacheKeysHit;
|
|
|
|
extern const Event DictCacheRequestTimeNs;
|
|
|
|
extern const Event DictCacheRequests;
|
|
|
|
extern const Event DictCacheLockWriteNs;
|
|
|
|
extern const Event DictCacheLockReadNs;
|
2020-01-06 20:38:32 +00:00
|
|
|
extern const Event FileOpen;
|
2020-01-02 19:33:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
|
|
|
extern const Metric DictCacheRequests;
|
|
|
|
}
|
|
|
|
|
2019-10-25 18:06:08 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
namespace ErrorCodes
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-01 17:40:46 +00:00
|
|
|
extern const int TYPE_MISMATCH;
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int UNSUPPORTED_METHOD;
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int TOO_SMALL_BUFFER_SIZE;
|
2020-01-06 20:38:32 +00:00
|
|
|
extern const int FILE_DOESNT_EXIST;
|
|
|
|
extern const int CANNOT_OPEN_FILE;
|
|
|
|
extern const int CANNOT_IO_SUBMIT;
|
|
|
|
extern const int CANNOT_IO_GETEVENTS;
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|
|
|
|
|
2020-01-03 19:52:07 +00:00
|
|
|
namespace
|
2020-01-01 17:40:46 +00:00
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
constexpr size_t MAX_KEYS_TO_READ_ONCE = 128;
|
2020-01-07 11:26:52 +00:00
|
|
|
constexpr size_t SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
|
|
|
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
2020-01-06 20:38:32 +00:00
|
|
|
constexpr size_t MAX_ATTRIBUTES_SIZE = 1024;
|
|
|
|
|
|
|
|
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
|
|
|
|
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
|
|
|
|
|
|
|
|
constexpr size_t KEY_IN_MEMORY_BIT = 63;
|
|
|
|
constexpr size_t KEY_IN_MEMORY = (1ULL << KEY_IN_MEMORY_BIT);
|
|
|
|
constexpr size_t BLOCK_INDEX_BITS = 32;
|
|
|
|
constexpr size_t INDEX_IN_BLOCK_BITS = 16;
|
|
|
|
constexpr size_t INDEX_IN_BLOCK_MASK = (1ULL << INDEX_IN_BLOCK_BITS) - 1;
|
|
|
|
constexpr size_t BLOCK_INDEX_MASK = ((1ULL << (BLOCK_INDEX_BITS + INDEX_IN_BLOCK_BITS)) - 1) ^ INDEX_IN_BLOCK_MASK;
|
|
|
|
|
2020-01-05 20:31:25 +00:00
|
|
|
constexpr size_t NOT_FOUND = -1;
|
2020-01-06 20:38:32 +00:00
|
|
|
|
2020-01-03 19:52:07 +00:00
|
|
|
const std::string BIN_FILE_EXT = ".bin";
|
|
|
|
const std::string IND_FILE_EXT = ".idx";
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
|
|
|
|
2020-01-06 20:38:32 +00:00
|
|
|
CachePartition::KeyMetadata::time_point_t CachePartition::KeyMetadata::expiresAt() const
|
|
|
|
{
|
|
|
|
return ext::safe_bit_cast<time_point_t>(data & KEY_METADATA_EXPIRES_AT_MASK);
|
|
|
|
}
|
|
|
|
void CachePartition::KeyMetadata::setExpiresAt(const time_point_t & t)
|
|
|
|
{
|
|
|
|
data = ext::safe_bit_cast<time_point_urep_t>(t);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CachePartition::KeyMetadata::isDefault() const
|
|
|
|
{
|
|
|
|
return (data & KEY_METADATA_IS_DEFAULT_MASK) == KEY_METADATA_IS_DEFAULT_MASK;
|
|
|
|
}
|
|
|
|
void CachePartition::KeyMetadata::setDefault()
|
|
|
|
{
|
|
|
|
data |= KEY_METADATA_IS_DEFAULT_MASK;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CachePartition::Index::inMemory() const
|
|
|
|
{
|
|
|
|
return (index & KEY_IN_MEMORY) == KEY_IN_MEMORY;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CachePartition::Index::exists() const
|
|
|
|
{
|
|
|
|
return index != NOT_FOUND;
|
|
|
|
}
|
|
|
|
|
|
|
|
void CachePartition::Index::setNotExists()
|
|
|
|
{
|
|
|
|
index = NOT_FOUND;
|
|
|
|
}
|
|
|
|
|
|
|
|
void CachePartition::Index::setInMemory(const bool in_memory)
|
|
|
|
{
|
|
|
|
index = (index & ~KEY_IN_MEMORY) | (static_cast<size_t>(in_memory) << KEY_IN_MEMORY_BIT);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t CachePartition::Index::getAddressInBlock() const
|
|
|
|
{
|
|
|
|
return index & INDEX_IN_BLOCK_MASK;
|
|
|
|
}
|
|
|
|
|
|
|
|
void CachePartition::Index::setAddressInBlock(const size_t address_in_block)
|
|
|
|
{
|
|
|
|
index = (index & ~INDEX_IN_BLOCK_MASK) | address_in_block;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t CachePartition::Index::getBlockId() const
|
|
|
|
{
|
|
|
|
return (index & BLOCK_INDEX_MASK) >> INDEX_IN_BLOCK_BITS;
|
|
|
|
}
|
|
|
|
|
|
|
|
void CachePartition::Index::setBlockId(const size_t block_id)
|
|
|
|
{
|
|
|
|
index = (index & ~BLOCK_INDEX_MASK) | (block_id << INDEX_IN_BLOCK_BITS);
|
|
|
|
}
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
CachePartition::CachePartition(
|
|
|
|
const AttributeUnderlyingType & /* key_structure */, const std::vector<AttributeUnderlyingType> & attributes_structure,
|
2020-01-07 11:26:52 +00:00
|
|
|
const std::string & dir_path, const size_t file_id_, const size_t max_size_)
|
|
|
|
: file_id(file_id_), max_size(max_size_), path(dir_path + "/" + std::to_string(file_id)), memory(SSD_BLOCK_SIZE, BUFFER_ALIGNMENT)
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
keys_buffer.type = AttributeUnderlyingType::utUInt64;
|
|
|
|
keys_buffer.values = std::vector<UInt64>();
|
|
|
|
for (const auto & type : attributes_structure)
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-03 19:52:07 +00:00
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
2020-01-05 13:59:49 +00:00
|
|
|
attributes_buffer.emplace_back(); \
|
|
|
|
attributes_buffer.back().type = type; \
|
|
|
|
attributes_buffer.back().values = std::vector<TYPE>(); \
|
2020-01-03 19:52:07 +00:00
|
|
|
break;
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
2020-01-03 19:52:07 +00:00
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|
2020-01-06 20:38:32 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
ProfileEvents::increment(ProfileEvents::FileOpen);
|
|
|
|
|
|
|
|
const std::string filename = path + BIN_FILE_EXT;
|
|
|
|
read_fd = ::open(filename.c_str(), O_RDONLY | O_DIRECT);
|
|
|
|
if (read_fd == -1)
|
|
|
|
{
|
|
|
|
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
|
|
|
|
throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
CachePartition::~CachePartition()
|
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
::close(read_fd);
|
2020-01-03 19:52:07 +00:00
|
|
|
}
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
void CachePartition::appendBlock(const Attribute & new_keys, const Attributes & new_attributes)
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
if (new_attributes.size() != attributes_buffer.size())
|
2020-01-03 19:52:07 +00:00
|
|
|
throw Exception{"Wrong columns number in block.", ErrorCodes::BAD_ARGUMENTS};
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto & ids = std::get<Attribute::Container<UInt64>>(new_keys.values);
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
appendValuesToAttribute(keys_buffer, new_keys);
|
2020-01-05 13:59:49 +00:00
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
if (!write_buffer)
|
|
|
|
write_buffer.emplace(memory.data(), memory.size());
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
for (size_t index = 0; index < ids.size();)
|
2020-01-06 20:38:32 +00:00
|
|
|
{
|
2020-01-07 11:26:52 +00:00
|
|
|
auto & key_index = key_to_metadata[ids[index]].index;
|
|
|
|
key_index.setInMemory(true);
|
|
|
|
key_index.setBlockId(current_memory_block_id);
|
|
|
|
key_index.setAddressInBlock(write_buffer->offset());
|
|
|
|
|
|
|
|
for (const auto & attribute : new_attributes)
|
|
|
|
{
|
|
|
|
// TODO:: переделать через столбцы + getDataAt
|
|
|
|
switch (attribute.type) {
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
|
|
|
if (sizeof(TYPE) > write_buffer->available()) \
|
|
|
|
{ \
|
|
|
|
flush(); \
|
|
|
|
continue; \
|
|
|
|
} \
|
|
|
|
else \
|
|
|
|
{ \
|
|
|
|
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); \
|
|
|
|
writeBinary(values[index], *write_buffer); \
|
|
|
|
} \
|
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
++index;
|
2020-01-06 20:38:32 +00:00
|
|
|
}
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
size_t CachePartition::appendValuesToAttribute(Attribute & to, const Attribute & from)
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
|
|
|
switch (to.type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
|
|
|
auto &to_values = std::get<Attribute::Container<TYPE>>(to.values); \
|
|
|
|
auto &from_values = std::get<Attribute::Container<TYPE>>(from.values); \
|
|
|
|
size_t prev_size = to_values.size(); \
|
|
|
|
to_values.resize(to_values.size() + from_values.size()); \
|
2020-01-06 20:38:32 +00:00
|
|
|
memcpy(&to_values[prev_size], &from_values[0], from_values.size() * sizeof(TYPE)); \
|
2020-01-07 11:26:52 +00:00
|
|
|
return from_values.size() * sizeof(TYPE); \
|
2020-01-03 19:52:07 +00:00
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
2020-01-07 11:26:52 +00:00
|
|
|
throw Exception{"Unknown attribute type: " + std::to_string(static_cast<int>(to.type)), ErrorCodes::TYPE_MISMATCH};
|
2020-01-03 19:52:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
void CachePartition::flush()
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-03 19:52:07 +00:00
|
|
|
if (!write_data_buffer)
|
|
|
|
{
|
2020-01-07 11:26:52 +00:00
|
|
|
//write_data_buffer = std::make_unique<WriteBufferAIO>(path + BIN_FILE_EXT, buffer_size, O_RDWR | O_CREAT | O_TRUNC);
|
2020-01-03 19:52:07 +00:00
|
|
|
// TODO: не перетирать + seek в конец файла
|
|
|
|
}
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto & ids = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
|
2020-01-06 20:38:32 +00:00
|
|
|
if (ids.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!!");
|
2020-01-04 15:04:16 +00:00
|
|
|
|
|
|
|
std::vector<size_t> offsets;
|
2019-10-25 18:06:08 +00:00
|
|
|
|
|
|
|
size_t prev_size = 0;
|
2020-01-04 15:04:16 +00:00
|
|
|
for (size_t row = 0; row < ids.size(); ++row)
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-04 15:04:16 +00:00
|
|
|
offsets.push_back((offsets.empty() ? write_data_buffer->getPositionInFile() : offsets.back()) + prev_size);
|
2019-10-25 18:06:08 +00:00
|
|
|
prev_size = 0;
|
2020-01-04 15:04:16 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
for (size_t col = 0; col < attributes_buffer.size(); ++col)
|
2019-10-25 18:06:08 +00:00
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto & attribute = attributes_buffer[col];
|
2020-01-04 15:04:16 +00:00
|
|
|
|
|
|
|
switch (attribute.type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
|
|
|
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); \
|
|
|
|
writeBinary(values[row], *static_cast<WriteBuffer*>(write_data_buffer.get())); \
|
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-01-03 19:52:07 +00:00
|
|
|
write_data_buffer->sync();
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-04 15:04:16 +00:00
|
|
|
/// commit changes in index
|
|
|
|
for (size_t row = 0; row < ids.size(); ++row)
|
2020-01-06 20:38:32 +00:00
|
|
|
{
|
|
|
|
key_to_metadata[ids[row]].index.setInMemory(false);
|
2020-01-07 11:26:52 +00:00
|
|
|
key_to_metadata[ids[row]].index.setBlockId(current_file_block_id);
|
2020-01-06 20:38:32 +00:00
|
|
|
key_to_metadata[ids[row]].index.setAddressInBlock(offsets[row]);
|
|
|
|
Poco::Logger::get("INDEX:").information("NEW MAP: " + std::to_string(ids[row]) + " -> " + std::to_string(key_to_metadata[ids[row]].index.index));
|
|
|
|
}
|
2020-01-04 15:04:16 +00:00
|
|
|
|
|
|
|
/// clear buffer
|
2020-01-05 13:59:49 +00:00
|
|
|
std::visit([](auto & attr) { attr.clear(); }, keys_buffer.values);
|
|
|
|
for (auto & attribute : attributes_buffer)
|
|
|
|
std::visit([](auto & attr) { attr.clear(); }, attribute.values);
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|
|
|
|
|
2020-01-03 19:52:07 +00:00
|
|
|
template <typename Out, typename Key>
|
2020-01-05 20:31:25 +00:00
|
|
|
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
|
2020-01-03 19:52:07 +00:00
|
|
|
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found) const
|
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
PaddedPODArray<Index> indices(ids.size());
|
2020-01-05 17:05:49 +00:00
|
|
|
for (size_t i = 0; i < ids.size(); ++i)
|
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
auto it = key_to_metadata.find(ids[i]);
|
|
|
|
if (it == std::end(key_to_metadata)) // TODO: check expired
|
2020-01-05 20:31:25 +00:00
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
Poco::Logger::get("part:").information("NOT FOUND " + std::to_string(ids[i]));
|
|
|
|
indices[i].setNotExists();
|
|
|
|
not_found[ids[i]].push_back(i);
|
2020-01-05 20:31:25 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
Poco::Logger::get("part:").information("HIT " + std::to_string(ids[i]));
|
|
|
|
indices[i] = it->second.index;
|
2020-01-05 20:31:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
getValueFromMemory<Out>(attribute_index, indices, out);
|
|
|
|
getValueFromStorage<Out>(attribute_index, indices, out);
|
2020-01-05 17:05:49 +00:00
|
|
|
}
|
2020-01-03 19:52:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-05 20:31:25 +00:00
|
|
|
template <typename Out>
|
|
|
|
void CachePartition::getValueFromMemory(
|
2020-01-06 20:38:32 +00:00
|
|
|
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
2020-01-07 11:26:52 +00:00
|
|
|
//const auto & attribute = std::get<Attribute::Container<Out>>(attributes_buffer[attribute_index].values);
|
2020-01-05 20:31:25 +00:00
|
|
|
for (size_t i = 0; i < indices.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & index = indices[i];
|
2020-01-06 20:38:32 +00:00
|
|
|
if (index.exists() && index.inMemory())
|
2020-01-05 20:31:25 +00:00
|
|
|
{
|
2020-01-07 11:26:52 +00:00
|
|
|
const size_t offset = index.getAddressInBlock();
|
|
|
|
|
|
|
|
Poco::Logger::get("part:").information("GET FROM MEMORY " + std::to_string(i) + " --- " + std::to_string(offset));
|
|
|
|
|
|
|
|
ReadBufferFromMemory read_buffer(memory.data() + offset, memory.size() - offset);
|
|
|
|
readValueFromBuffer(attribute_index, out[i], read_buffer);
|
2020-01-05 20:31:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename Out>
|
|
|
|
void CachePartition::getValueFromStorage(
|
2020-01-06 20:38:32 +00:00
|
|
|
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
|
2020-01-05 20:31:25 +00:00
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
std::vector<std::pair<UInt64, size_t>> index_to_out;
|
|
|
|
for (size_t i = 0; i < indices.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & index = indices[i];
|
|
|
|
if (index.exists() && !index.inMemory())
|
|
|
|
index_to_out.emplace_back(index.getAddressInBlock(), i);
|
|
|
|
}
|
|
|
|
if (index_to_out.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
std::sort(std::begin(index_to_out), std::end(index_to_out));
|
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
DB::Memory read_buffer(MAX_ATTRIBUTES_SIZE * index_to_out.size(), BUFFER_ALIGNMENT);
|
2020-01-06 20:38:32 +00:00
|
|
|
|
|
|
|
std::vector<iocb> requests(index_to_out.size());
|
|
|
|
memset(requests.data(), 0, requests.size() * sizeof(requests.front()));
|
|
|
|
std::vector<iocb*> pointers(index_to_out.size());
|
|
|
|
for (size_t i = 0; i < index_to_out.size(); ++i)
|
|
|
|
{
|
|
|
|
#if defined(__FreeBSD__)
|
|
|
|
request.aio.aio_lio_opcode = LIO_READ;
|
|
|
|
request.aio.aio_fildes = read_fd;
|
|
|
|
request.aio.aio_buf = reinterpret_cast<volatile void *>(read_buffer.data() + i * MAX_ATTRIBUTES_SIZE);
|
|
|
|
request.aio.aio_nbytes = MAX_ATTRIBUTES_SIZE;
|
|
|
|
request.aio.aio_offset = index_to_out[i].first;
|
|
|
|
request.aio_data = i;
|
|
|
|
#else
|
|
|
|
requests[i].aio_lio_opcode = IOCB_CMD_PREAD;
|
|
|
|
requests[i].aio_fildes = read_fd;
|
|
|
|
requests[i].aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + i * MAX_ATTRIBUTES_SIZE;
|
|
|
|
requests[i].aio_nbytes = MAX_ATTRIBUTES_SIZE;
|
|
|
|
requests[i].aio_offset = index_to_out[i].first;
|
|
|
|
requests[i].aio_data = i;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
Poco::Logger::get("requests:").information();
|
|
|
|
pointers[i] = &requests[i];
|
|
|
|
}
|
|
|
|
Poco::Logger::get("requests:").information(std::to_string(requests.size()));
|
|
|
|
|
|
|
|
//const auto pointers = ext::map<std::vector>(
|
|
|
|
// std::begin(requests), std::end(requests), [](const iocb & request) { return &request; });
|
|
|
|
|
|
|
|
AIOContext context(MAX_KEYS_TO_READ_ONCE);
|
|
|
|
|
|
|
|
std::vector<io_event> events(index_to_out.size());
|
|
|
|
|
|
|
|
for (size_t i = 0; i < index_to_out.size(); i += MAX_KEYS_TO_READ_ONCE)
|
|
|
|
{
|
|
|
|
size_t to_push = std::min(MAX_KEYS_TO_READ_ONCE, index_to_out.size() - i);
|
|
|
|
size_t push_index = i;
|
|
|
|
int pushed = 0;
|
|
|
|
while (to_push > 0 && (pushed = io_submit(context.ctx, to_push, pointers.data() + push_index)) < 0)
|
|
|
|
{
|
|
|
|
if (errno != EINTR)
|
|
|
|
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
|
|
|
|
to_push -= pushed;
|
|
|
|
push_index += pushed;
|
|
|
|
pushed = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t to_get = std::min(MAX_KEYS_TO_READ_ONCE, index_to_out.size() - i);
|
|
|
|
size_t got_index = i;
|
|
|
|
int got = 0;
|
|
|
|
while (to_get > 0 && (got = io_getevents(context.ctx, to_get, to_get, events.data() + got_index, NULL)) < 0)
|
|
|
|
{
|
|
|
|
if (errno != EINTR)
|
|
|
|
throwFromErrno("io_getevents: Failed to get an event from asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
|
|
|
|
to_get -= got;
|
|
|
|
got_index += got;
|
|
|
|
got = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//std::sort(std::begin(events), std::end(events), [](const auto & lhs, const auto & rhs) { return lhs.data < rhs.data; });
|
|
|
|
for (const auto & event : events)
|
|
|
|
{
|
|
|
|
Poco::Logger::get("Read:").information("ito: f:" + std::to_string(index_to_out[event.data].first) + " s:" + std::to_string(index_to_out[event.data].second));
|
|
|
|
Poco::Logger::get("Read:").information("data: " + std::to_string(event.data) + " res: " + std::to_string(event.res));
|
2020-01-07 11:26:52 +00:00
|
|
|
|
2020-01-06 20:38:32 +00:00
|
|
|
DB::ReadBufferFromMemory buf(read_buffer.data() + event.data * MAX_ATTRIBUTES_SIZE, event.res);
|
2020-01-07 11:26:52 +00:00
|
|
|
readValueFromBuffer(attribute_index, out[index_to_out[event.data].second], buf);
|
|
|
|
}
|
|
|
|
}
|
2020-01-06 20:38:32 +00:00
|
|
|
|
2020-01-07 11:26:52 +00:00
|
|
|
template <typename Out>
|
|
|
|
void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst, ReadBuffer & buf) const
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < attribute_index; ++i)
|
|
|
|
{
|
|
|
|
switch (attributes_buffer[i].type)
|
2020-01-06 20:38:32 +00:00
|
|
|
{
|
2020-01-07 11:26:52 +00:00
|
|
|
#define DISPATCH(TYPE) \
|
2020-01-06 20:38:32 +00:00
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
2020-01-07 11:26:52 +00:00
|
|
|
buf.ignore(sizeof(TYPE)); \
|
2020-01-06 20:38:32 +00:00
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2020-01-07 11:26:52 +00:00
|
|
|
|
|
|
|
switch (attributes_buffer[attribute_index].type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
readBinary(dst, buf); \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
2020-01-03 19:52:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-05 20:31:25 +00:00
|
|
|
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out) const
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < ids.size(); ++i)
|
2020-01-06 20:38:32 +00:00
|
|
|
{
|
|
|
|
auto it = key_to_metadata.find(ids[i]);
|
|
|
|
if (it == std::end(key_to_metadata))
|
|
|
|
{
|
|
|
|
out[i] = 0;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
out[i] = it->second.isDefault();
|
|
|
|
}
|
|
|
|
}
|
2020-01-05 20:31:25 +00:00
|
|
|
}
|
|
|
|
|
2020-01-04 15:04:16 +00:00
|
|
|
CacheStorage::CacheStorage(SSDCacheDictionary & dictionary_, const std::string & path_, const size_t partitions_count_, const size_t partition_max_size_)
|
|
|
|
: dictionary(dictionary_)
|
|
|
|
, path(path_)
|
|
|
|
, partition_max_size(partition_max_size_)
|
|
|
|
, log(&Poco::Logger::get("CacheStorage"))
|
|
|
|
{
|
|
|
|
std::vector<AttributeUnderlyingType> structure;
|
|
|
|
for (const auto & item : dictionary.getStructure().attributes)
|
|
|
|
{
|
|
|
|
structure.push_back(item.underlying_type);
|
|
|
|
}
|
|
|
|
for (size_t partition_id = 0; partition_id < partitions_count_; ++partition_id)
|
2020-01-05 13:59:49 +00:00
|
|
|
partitions.emplace_back(std::make_unique<CachePartition>(AttributeUnderlyingType::utUInt64, structure, path_, partition_id, partition_max_size));
|
2020-01-04 15:04:16 +00:00
|
|
|
}
|
|
|
|
|
2020-01-02 19:33:19 +00:00
|
|
|
template <typename PresentIdHandler, typename AbsentIdHandler>
|
2020-01-04 15:04:16 +00:00
|
|
|
void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
|
2020-01-02 19:33:19 +00:00
|
|
|
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found)
|
|
|
|
{
|
2020-01-05 20:31:25 +00:00
|
|
|
Poco::Logger::get("cachestorage").information("update");
|
2020-01-02 19:33:19 +00:00
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
|
|
|
|
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
|
|
|
|
|
|
|
|
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
|
|
|
|
for (const auto id : requested_ids)
|
|
|
|
remaining_ids.insert({id, 0});
|
|
|
|
|
|
|
|
const auto now = std::chrono::system_clock::now();
|
|
|
|
|
|
|
|
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
|
|
|
|
|
|
|
if (now > backoff_end_time)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (update_error_count)
|
|
|
|
{
|
|
|
|
/// Recover after error: we have to clone the source here because
|
|
|
|
/// it could keep connections which should be reset after error.
|
|
|
|
source_ptr = source_ptr->clone();
|
|
|
|
}
|
|
|
|
|
|
|
|
Stopwatch watch;
|
|
|
|
auto stream = source_ptr->loadIds(requested_ids);
|
|
|
|
stream->readPrefix();
|
|
|
|
|
|
|
|
while (const auto block = stream->read())
|
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
const auto new_keys = createAttributesFromBlock(block, 0, { AttributeUnderlyingType::utUInt64 }).front();
|
2020-01-05 17:05:49 +00:00
|
|
|
const auto new_attributes = createAttributesFromBlock(
|
2020-01-06 20:38:32 +00:00
|
|
|
block, 1, ext::map<std::vector>(dictionary.getAttributes(), [](const auto & attribute) { return attribute.type; }));
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto & ids = std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values);
|
2020-01-02 19:33:19 +00:00
|
|
|
|
|
|
|
for (const auto i : ext::range(0, ids.size()))
|
|
|
|
{
|
|
|
|
/// mark corresponding id as found
|
2020-01-03 19:52:07 +00:00
|
|
|
on_updated(ids[i], i, new_attributes);
|
|
|
|
remaining_ids[ids[i]] = 1;
|
2020-01-02 19:33:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO: Add TTL to block
|
2020-01-05 13:59:49 +00:00
|
|
|
partitions[0]->appendBlock(new_keys, new_attributes);
|
2020-01-02 19:33:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
stream->readSuffix();
|
|
|
|
|
|
|
|
update_error_count = 0;
|
|
|
|
last_update_exception = std::exception_ptr{};
|
|
|
|
backoff_end_time = std::chrono::system_clock::time_point{};
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
++update_error_count;
|
|
|
|
last_update_exception = std::current_exception();
|
|
|
|
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, update_error_count));
|
|
|
|
|
|
|
|
tryLogException(last_update_exception, log, "Could not update cache dictionary '" + dictionary.getName() +
|
|
|
|
"', next update is scheduled at " + ext::to_string(backoff_end_time));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t not_found_num = 0, found_num = 0;
|
|
|
|
|
|
|
|
/// Check which ids have not been found and require setting null_value
|
2020-01-05 13:59:49 +00:00
|
|
|
CachePartition::Attribute new_keys;
|
|
|
|
new_keys.type = AttributeUnderlyingType::utUInt64;
|
|
|
|
new_keys.values = std::vector<UInt64>();
|
2020-01-03 19:52:07 +00:00
|
|
|
CachePartition::Attributes new_attributes;
|
|
|
|
{
|
|
|
|
/// TODO: create attributes from structure
|
|
|
|
for (const auto & attribute : dictionary.getAttributes())
|
|
|
|
{
|
|
|
|
switch (attribute.type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
2020-01-04 15:04:16 +00:00
|
|
|
new_attributes.emplace_back(); \
|
|
|
|
new_attributes.back().type = attribute.type; \
|
|
|
|
new_attributes.back().values = std::vector<TYPE>(); \
|
2020-01-03 19:52:07 +00:00
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-01-02 19:33:19 +00:00
|
|
|
for (const auto & id_found_pair : remaining_ids)
|
|
|
|
{
|
|
|
|
if (id_found_pair.second)
|
|
|
|
{
|
|
|
|
++found_num;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
++not_found_num;
|
|
|
|
|
|
|
|
const auto id = id_found_pair.first;
|
|
|
|
|
|
|
|
if (update_error_count)
|
|
|
|
{
|
|
|
|
/// TODO: юзать старые значения.
|
|
|
|
|
|
|
|
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
|
|
|
|
std::rethrow_exception(last_update_exception);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO: Add TTL
|
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
// Set key
|
|
|
|
std::get<std::vector<UInt64>>(new_keys.values).push_back(id);
|
|
|
|
|
2020-01-02 19:33:19 +00:00
|
|
|
/// Set null_value for each attribute
|
|
|
|
const auto & attributes = dictionary.getAttributes();
|
|
|
|
for (size_t i = 0; i < attributes.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & attribute = attributes[i];
|
2020-01-04 15:04:16 +00:00
|
|
|
// append null
|
|
|
|
switch (attribute.type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
|
|
|
auto & to_values = std::get<std::vector<TYPE>>(new_attributes[i].values); \
|
|
|
|
auto & null_value = std::get<TYPE>(attribute.null_value); \
|
|
|
|
to_values.push_back(null_value); \
|
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
2020-01-02 19:33:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// inform caller that the cell has not been found
|
|
|
|
on_id_not_found(id);
|
|
|
|
}
|
2020-01-06 20:38:32 +00:00
|
|
|
if (not_found_num)
|
|
|
|
partitions[0]->appendBlock(new_keys, new_attributes);
|
2020-01-02 19:33:19 +00:00
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
|
|
|
|
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
|
|
|
|
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
|
|
|
|
}
|
|
|
|
|
2020-01-05 17:05:49 +00:00
|
|
|
CachePartition::Attributes CacheStorage::createAttributesFromBlock(
|
2020-01-06 20:38:32 +00:00
|
|
|
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure)
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
|
|
|
CachePartition::Attributes attributes;
|
|
|
|
|
|
|
|
const auto columns = block.getColumns();
|
2020-01-05 17:05:49 +00:00
|
|
|
for (size_t i = 0; i < structure.size(); ++i)
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
2020-01-06 20:38:32 +00:00
|
|
|
const auto & column = columns[i + begin_column];
|
2020-01-05 17:05:49 +00:00
|
|
|
switch (structure[i])
|
2020-01-03 19:52:07 +00:00
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
{ \
|
|
|
|
std::vector<TYPE> values(column->size()); \
|
|
|
|
const auto raw_data = column->getRawData(); \
|
2020-01-06 20:38:32 +00:00
|
|
|
memcpy(&values[0], raw_data.data, raw_data.size * sizeof(TYPE)); \
|
2020-01-04 15:04:16 +00:00
|
|
|
attributes.emplace_back(); \
|
2020-01-05 17:05:49 +00:00
|
|
|
attributes.back().type = structure[i]; \
|
2020-01-04 15:04:16 +00:00
|
|
|
attributes.back().values = std::move(values); \
|
2020-01-03 19:52:07 +00:00
|
|
|
} \
|
|
|
|
break;
|
|
|
|
|
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
#undef DISPATCH
|
|
|
|
|
|
|
|
case AttributeUnderlyingType::utString:
|
|
|
|
// TODO: string support
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return attributes;
|
|
|
|
}
|
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
SSDCacheDictionary::SSDCacheDictionary(
|
|
|
|
const std::string & name_,
|
|
|
|
const DictionaryStructure & dict_struct_,
|
|
|
|
DictionarySourcePtr source_ptr_,
|
|
|
|
const DictionaryLifetime dict_lifetime_,
|
2020-01-03 19:52:07 +00:00
|
|
|
const std::string & path_,
|
|
|
|
const size_t partition_max_size_)
|
2020-01-01 17:40:46 +00:00
|
|
|
: name(name_)
|
|
|
|
, dict_struct(dict_struct_)
|
|
|
|
, source_ptr(std::move(source_ptr_))
|
|
|
|
, dict_lifetime(dict_lifetime_)
|
2020-01-03 19:52:07 +00:00
|
|
|
, path(path_)
|
|
|
|
, partition_max_size(partition_max_size_)
|
2020-01-02 19:33:19 +00:00
|
|
|
, storage(*this, path, 1, partition_max_size)
|
2020-01-03 19:52:07 +00:00
|
|
|
, log(&Poco::Logger::get("SSDCacheDictionary"))
|
2020-01-01 17:40:46 +00:00
|
|
|
{
|
|
|
|
if (!this->source_ptr->supportsSelectiveLoad())
|
|
|
|
throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
|
|
|
|
|
|
|
|
createAttributes();
|
|
|
|
}
|
|
|
|
|
|
|
|
#define DECLARE(TYPE) \
|
|
|
|
void SSDCacheDictionary::get##TYPE( \
|
|
|
|
const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const \
|
|
|
|
{ \
|
|
|
|
const auto index = getAttributeIndex(attribute_name); \
|
|
|
|
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
|
|
|
|
const auto null_value = std::get<TYPE>(attributes[index].null_value); \
|
|
|
|
getItemsNumberImpl<TYPE, TYPE>( \
|
2020-01-05 13:59:49 +00:00
|
|
|
index, \
|
2020-01-04 15:04:16 +00:00
|
|
|
ids, \
|
|
|
|
out, \
|
|
|
|
[&](const size_t) { return null_value; }); \
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
2020-01-05 13:59:49 +00:00
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
DECLARE(UInt8)
|
|
|
|
DECLARE(UInt16)
|
|
|
|
DECLARE(UInt32)
|
|
|
|
DECLARE(UInt64)
|
|
|
|
DECLARE(UInt128)
|
|
|
|
DECLARE(Int8)
|
|
|
|
DECLARE(Int16)
|
|
|
|
DECLARE(Int32)
|
|
|
|
DECLARE(Int64)
|
|
|
|
DECLARE(Float32)
|
|
|
|
DECLARE(Float64)
|
|
|
|
DECLARE(Decimal32)
|
|
|
|
DECLARE(Decimal64)
|
|
|
|
DECLARE(Decimal128)
|
|
|
|
#undef DECLARE
|
|
|
|
|
|
|
|
#define DECLARE(TYPE) \
|
|
|
|
void SSDCacheDictionary::get##TYPE( \
|
|
|
|
const std::string & attribute_name, \
|
|
|
|
const PaddedPODArray<Key> & ids, \
|
|
|
|
const PaddedPODArray<TYPE> & def, \
|
|
|
|
ResultArrayType<TYPE> & out) const \
|
|
|
|
{ \
|
|
|
|
const auto index = getAttributeIndex(attribute_name); \
|
|
|
|
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
|
|
|
|
getItemsNumberImpl<TYPE, TYPE>( \
|
2020-01-05 13:59:49 +00:00
|
|
|
index, \
|
2020-01-01 17:40:46 +00:00
|
|
|
ids, \
|
|
|
|
out, \
|
|
|
|
[&](const size_t row) { return def[row]; }); \
|
|
|
|
}
|
|
|
|
DECLARE(UInt8)
|
|
|
|
DECLARE(UInt16)
|
|
|
|
DECLARE(UInt32)
|
|
|
|
DECLARE(UInt64)
|
|
|
|
DECLARE(UInt128)
|
|
|
|
DECLARE(Int8)
|
|
|
|
DECLARE(Int16)
|
|
|
|
DECLARE(Int32)
|
|
|
|
DECLARE(Int64)
|
|
|
|
DECLARE(Float32)
|
|
|
|
DECLARE(Float64)
|
|
|
|
DECLARE(Decimal32)
|
|
|
|
DECLARE(Decimal64)
|
|
|
|
DECLARE(Decimal128)
|
|
|
|
#undef DECLARE
|
|
|
|
|
|
|
|
#define DECLARE(TYPE) \
|
|
|
|
void SSDCacheDictionary::get##TYPE( \
|
|
|
|
const std::string & attribute_name, \
|
|
|
|
const PaddedPODArray<Key> & ids, \
|
|
|
|
const TYPE def, \
|
|
|
|
ResultArrayType<TYPE> & out) const \
|
|
|
|
{ \
|
|
|
|
const auto index = getAttributeIndex(attribute_name); \
|
|
|
|
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
|
|
|
|
getItemsNumberImpl<TYPE, TYPE>( \
|
2020-01-05 13:59:49 +00:00
|
|
|
index, \
|
2020-01-01 17:40:46 +00:00
|
|
|
ids, \
|
|
|
|
out, \
|
|
|
|
[&](const size_t) { return def; }); \
|
|
|
|
}
|
|
|
|
DECLARE(UInt8)
|
|
|
|
DECLARE(UInt16)
|
|
|
|
DECLARE(UInt32)
|
|
|
|
DECLARE(UInt64)
|
|
|
|
DECLARE(UInt128)
|
|
|
|
DECLARE(Int8)
|
|
|
|
DECLARE(Int16)
|
|
|
|
DECLARE(Int32)
|
|
|
|
DECLARE(Int64)
|
|
|
|
DECLARE(Float32)
|
|
|
|
DECLARE(Float64)
|
|
|
|
DECLARE(Decimal32)
|
|
|
|
DECLARE(Decimal64)
|
|
|
|
DECLARE(Decimal128)
|
|
|
|
#undef DECLARE
|
|
|
|
|
|
|
|
template <typename AttributeType, typename OutputType, typename DefaultGetter>
|
|
|
|
void SSDCacheDictionary::getItemsNumberImpl(
|
2020-01-05 13:59:49 +00:00
|
|
|
const size_t attribute_index, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
|
2020-01-01 17:40:46 +00:00
|
|
|
{
|
|
|
|
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
|
2020-01-05 13:59:49 +00:00
|
|
|
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids);
|
2020-01-01 17:40:46 +00:00
|
|
|
if (not_found_ids.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
std::vector<Key> required_ids(not_found_ids.size());
|
2020-01-05 13:59:49 +00:00
|
|
|
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](const auto & pair) { return pair.first; });
|
2020-01-01 17:40:46 +00:00
|
|
|
|
2020-01-02 19:33:19 +00:00
|
|
|
storage.update(
|
|
|
|
source_ptr,
|
2020-01-01 17:40:46 +00:00
|
|
|
required_ids,
|
2020-01-04 15:04:16 +00:00
|
|
|
[&](const auto id, const auto row, const auto & new_attributes) {
|
2020-01-03 19:52:07 +00:00
|
|
|
for (const size_t out_row : not_found_ids[id])
|
2020-01-04 15:04:16 +00:00
|
|
|
out[out_row] = std::get<std::vector<OutputType>>(new_attributes[attribute_index].values)[row];
|
2020-01-01 17:40:46 +00:00
|
|
|
},
|
2020-01-05 13:59:49 +00:00
|
|
|
[&](const size_t id)
|
2020-01-01 17:40:46 +00:00
|
|
|
{
|
|
|
|
for (const size_t row : not_found_ids[id])
|
|
|
|
out[row] = get_default(row);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
void SSDCacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
|
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto index = getAttributeIndex(attribute_name);
|
|
|
|
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
|
2020-01-01 17:40:46 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto null_value = StringRef{std::get<String>(attributes[index].null_value)};
|
2020-01-01 17:40:46 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
getItemsString(index, ids, out, [&](const size_t) { return null_value; });
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void SSDCacheDictionary::getString(
|
|
|
|
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
|
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto index = getAttributeIndex(attribute_name);
|
|
|
|
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
|
2020-01-01 17:40:46 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
getItemsString(index, ids, out, [&](const size_t row) { return def->getDataAt(row); });
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void SSDCacheDictionary::getString(
|
|
|
|
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
|
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
const auto index = getAttributeIndex(attribute_name);
|
|
|
|
checkAttributeType(name, attribute_name, attributes[index].type, AttributeUnderlyingType::utString);
|
2020-01-01 17:40:46 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
getItemsString(index, ids, out, [&](const size_t) { return StringRef{def}; });
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
template <typename DefaultGetter>
|
2020-01-05 13:59:49 +00:00
|
|
|
void SSDCacheDictionary::getItemsString(const size_t attribute_index, const PaddedPODArray<Key> & ids,
|
2020-01-01 17:40:46 +00:00
|
|
|
ColumnString * out, DefaultGetter && get_default) const
|
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
UNUSED(attribute_index);
|
2020-01-01 17:40:46 +00:00
|
|
|
UNUSED(ids);
|
|
|
|
UNUSED(out);
|
|
|
|
UNUSED(get_default);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) const
|
|
|
|
{
|
|
|
|
auto it = attribute_index_by_name.find(attr_name);
|
|
|
|
if (it == std::end(attribute_index_by_name))
|
|
|
|
throw Exception{"Attribute `" + name + "` does not exist.", ErrorCodes::BAD_ARGUMENTS};
|
|
|
|
return it->second;
|
|
|
|
}
|
|
|
|
|
|
|
|
SSDCacheDictionary::Attribute & SSDCacheDictionary::getAttribute(const std::string & attr_name)
|
|
|
|
{
|
|
|
|
return attributes[getAttributeIndex(attr_name)];
|
|
|
|
}
|
|
|
|
|
|
|
|
const SSDCacheDictionary::Attribute & SSDCacheDictionary::getAttribute(const std::string & attr_name) const
|
|
|
|
{
|
|
|
|
return attributes[getAttributeIndex(attr_name)];
|
|
|
|
}
|
|
|
|
|
2020-01-02 19:33:19 +00:00
|
|
|
const SSDCacheDictionary::Attributes & SSDCacheDictionary::getAttributes() const
|
|
|
|
{
|
|
|
|
return attributes;
|
|
|
|
}
|
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
template <typename T>
|
|
|
|
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithTypeImpl(const AttributeUnderlyingType type, const Field & null_value)
|
|
|
|
{
|
|
|
|
Attribute attr{type, {}};
|
|
|
|
attr.null_value = static_cast<T>(null_value.get<NearestFieldType<T>>());
|
|
|
|
bytes_allocated += sizeof(T);
|
|
|
|
return attr;
|
|
|
|
}
|
|
|
|
|
|
|
|
template <>
|
|
|
|
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithTypeImpl<String>(const AttributeUnderlyingType type, const Field & null_value)
|
|
|
|
{
|
|
|
|
Attribute attr{type, {}};
|
|
|
|
attr.null_value = null_value.get<String>();
|
|
|
|
bytes_allocated += sizeof(StringRef);
|
|
|
|
//if (!string_arena)
|
|
|
|
// string_arena = std::make_unique<ArenaWithFreeLists>();
|
|
|
|
return attr;
|
|
|
|
}
|
|
|
|
|
|
|
|
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
#define DISPATCH(TYPE) \
|
|
|
|
case AttributeUnderlyingType::ut##TYPE: \
|
|
|
|
return createAttributeWithTypeImpl<TYPE>(type, null_value);
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-01 17:40:46 +00:00
|
|
|
DISPATCH(UInt8)
|
|
|
|
DISPATCH(UInt16)
|
|
|
|
DISPATCH(UInt32)
|
|
|
|
DISPATCH(UInt64)
|
|
|
|
DISPATCH(UInt128)
|
|
|
|
DISPATCH(Int8)
|
|
|
|
DISPATCH(Int16)
|
|
|
|
DISPATCH(Int32)
|
|
|
|
DISPATCH(Int64)
|
|
|
|
DISPATCH(Decimal32)
|
|
|
|
DISPATCH(Decimal64)
|
|
|
|
DISPATCH(Decimal128)
|
|
|
|
DISPATCH(Float32)
|
|
|
|
DISPATCH(Float64)
|
|
|
|
DISPATCH(String)
|
|
|
|
#undef DISPATCH
|
|
|
|
}
|
2020-01-04 15:04:16 +00:00
|
|
|
throw Exception{"Unknown attribute type: " + std::to_string(static_cast<int>(type)), ErrorCodes::TYPE_MISMATCH};
|
2020-01-01 17:40:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void SSDCacheDictionary::createAttributes()
|
|
|
|
{
|
2020-01-05 13:59:49 +00:00
|
|
|
attributes.reserve(dict_struct.attributes.size());
|
2020-01-01 17:40:46 +00:00
|
|
|
for (size_t i = 0; i < dict_struct.attributes.size(); ++i)
|
|
|
|
{
|
|
|
|
const auto & attribute = dict_struct.attributes[i];
|
|
|
|
|
|
|
|
attribute_index_by_name.emplace(attribute.name, i);
|
2020-01-03 19:52:07 +00:00
|
|
|
attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));
|
2020-01-01 17:40:46 +00:00
|
|
|
|
|
|
|
if (attribute.hierarchical)
|
|
|
|
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
|
|
|
|
ErrorCodes::TYPE_MISMATCH};
|
|
|
|
}
|
|
|
|
}
|
2019-10-25 18:06:08 +00:00
|
|
|
|
2020-01-05 13:59:49 +00:00
|
|
|
void registerDictionarySSDCache(DictionaryFactory & factory)
|
|
|
|
{
|
|
|
|
auto create_layout = [=](const std::string & name,
|
|
|
|
const DictionaryStructure & dict_struct,
|
|
|
|
const Poco::Util::AbstractConfiguration & config,
|
|
|
|
const std::string & config_prefix,
|
|
|
|
DictionarySourcePtr source_ptr) -> DictionaryPtr
|
|
|
|
{
|
|
|
|
if (dict_struct.key)
|
|
|
|
throw Exception{"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
|
|
|
|
|
|
|
|
if (dict_struct.range_min || dict_struct.range_max)
|
|
|
|
throw Exception{name
|
|
|
|
+ ": elements .structure.range_min and .structure.range_max should be defined only "
|
|
|
|
"for a dictionary of layout 'range_hashed'",
|
|
|
|
ErrorCodes::BAD_ARGUMENTS};
|
|
|
|
const auto & layout_prefix = config_prefix + ".layout";
|
|
|
|
const auto max_partition_size = config.getInt(layout_prefix + ".ssd.max_partition_size");
|
|
|
|
if (max_partition_size == 0)
|
|
|
|
throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
|
|
|
|
|
|
|
|
const auto path = config.getString(layout_prefix + ".ssd.path");
|
|
|
|
if (path.empty())
|
|
|
|
throw Exception{name + ": dictionary of layout 'cache' cannot have empty path",
|
|
|
|
ErrorCodes::BAD_ARGUMENTS};
|
|
|
|
|
|
|
|
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
|
|
|
|
return std::make_unique<SSDCacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, path, max_partition_size);
|
|
|
|
};
|
|
|
|
factory.registerLayout("ssd", create_layout, false);
|
|
|
|
}
|
|
|
|
|
2019-10-25 18:06:08 +00:00
|
|
|
}
|