ClickHouse/dbms/src/Dictionaries/SSDCacheDictionary.cpp

1395 lines
49 KiB
C++
Raw Normal View History

2019-10-25 18:06:08 +00:00
#include "SSDCacheDictionary.h"
2020-01-03 19:52:07 +00:00
#include <algorithm>
2019-10-25 18:06:08 +00:00
#include <Columns/ColumnsNumber.h>
2020-01-01 17:40:46 +00:00
#include <Common/typeid_cast.h>
2020-01-02 19:33:19 +00:00
#include <Common/ProfileEvents.h>
#include <Common/ProfilingScopedRWLock.h>
2020-01-07 14:59:03 +00:00
#include <Common/MemorySanitizer.h>
2020-01-01 17:40:46 +00:00
#include <DataStreams/IBlockInputStream.h>
2020-01-12 12:29:42 +00:00
#include <Poco/File.h>
2020-01-18 11:47:58 +00:00
#include "DictionaryBlockInputStream.h"
2020-01-05 13:59:49 +00:00
#include "DictionaryFactory.h"
2020-01-06 20:38:32 +00:00
#include <IO/AIO.h>
#include <IO/ReadHelpers.h>
2020-01-04 15:04:16 +00:00
#include <IO/WriteHelpers.h>
2020-01-01 17:40:46 +00:00
#include <ext/chrono_io.h>
#include <ext/map.h>
#include <ext/range.h>
#include <ext/size.h>
2020-01-06 20:38:32 +00:00
#include <ext/bit_cast.h>
2020-01-07 17:55:32 +00:00
#include <numeric>
2020-01-12 11:32:43 +00:00
#include <filesystem>
2019-10-25 18:06:08 +00:00
2020-01-02 19:33:19 +00:00
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
2020-01-06 20:38:32 +00:00
extern const Event FileOpen;
2020-01-07 14:59:03 +00:00
extern const Event WriteBufferAIOWrite;
extern const Event WriteBufferAIOWriteBytes;
2020-01-02 19:33:19 +00:00
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests;
2020-01-07 14:59:03 +00:00
extern const Metric Write;
2020-01-02 19:33:19 +00:00
}
2019-10-25 18:06:08 +00:00
namespace DB
{
2020-01-01 17:40:46 +00:00
namespace ErrorCodes
2019-10-25 18:06:08 +00:00
{
2020-01-01 17:40:46 +00:00
extern const int TYPE_MISMATCH;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
extern const int TOO_SMALL_BUFFER_SIZE;
2020-01-06 20:38:32 +00:00
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
2020-01-07 14:59:03 +00:00
extern const int AIO_WRITE_ERROR;
extern const int CANNOT_FSYNC;
2019-10-25 18:06:08 +00:00
}
2020-01-03 19:52:07 +00:00
namespace
2020-01-01 17:40:46 +00:00
{
2020-01-12 14:23:32 +00:00
constexpr size_t DEFAULT_SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE;
constexpr size_t DEFAULT_FILE_SIZE = 4 * 1024 * 1024 * 1024ULL;
constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
constexpr size_t DEFAULT_READ_BUFFER_SIZE = 16 * DEFAULT_SSD_BLOCK_SIZE;
2020-01-19 08:49:40 +00:00
constexpr size_t DEFAULT_WRITE_BUFFER_SIZE = DEFAULT_SSD_BLOCK_SIZE;
2020-01-07 17:55:32 +00:00
2020-01-12 14:23:32 +00:00
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
2020-01-11 07:20:48 +00:00
2020-01-06 20:38:32 +00:00
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
constexpr size_t KEY_IN_MEMORY_BIT = 63;
constexpr size_t KEY_IN_MEMORY = (1ULL << KEY_IN_MEMORY_BIT);
constexpr size_t BLOCK_INDEX_BITS = 32;
constexpr size_t INDEX_IN_BLOCK_BITS = 16;
constexpr size_t INDEX_IN_BLOCK_MASK = (1ULL << INDEX_IN_BLOCK_BITS) - 1;
constexpr size_t BLOCK_INDEX_MASK = ((1ULL << (BLOCK_INDEX_BITS + INDEX_IN_BLOCK_BITS)) - 1) ^ INDEX_IN_BLOCK_MASK;
2020-01-08 17:52:13 +00:00
constexpr size_t NOT_EXISTS = -1;
2020-01-06 20:38:32 +00:00
2020-01-11 20:23:51 +00:00
constexpr UInt8 HAS_NOT_FOUND = 2;
2020-01-03 19:52:07 +00:00
const std::string BIN_FILE_EXT = ".bin";
const std::string IND_FILE_EXT = ".idx";
2020-01-11 07:20:48 +00:00
int preallocateDiskSpace(int fd, size_t len)
{
#if defined(__FreeBSD__)
return posix_fallocate(fd, 0, len);
#else
return fallocate(fd, 0, 0, len);
#endif
}
2020-01-01 17:40:46 +00:00
}
2020-01-08 12:40:29 +00:00
CachePartition::Metadata::time_point_t CachePartition::Metadata::expiresAt() const
2020-01-06 20:38:32 +00:00
{
return ext::safe_bit_cast<time_point_t>(data & KEY_METADATA_EXPIRES_AT_MASK);
}
2020-01-08 12:40:29 +00:00
void CachePartition::Metadata::setExpiresAt(const time_point_t & t)
2020-01-06 20:38:32 +00:00
{
data = ext::safe_bit_cast<time_point_urep_t>(t);
}
2020-01-08 12:40:29 +00:00
bool CachePartition::Metadata::isDefault() const
2020-01-06 20:38:32 +00:00
{
return (data & KEY_METADATA_IS_DEFAULT_MASK) == KEY_METADATA_IS_DEFAULT_MASK;
}
2020-01-08 12:40:29 +00:00
void CachePartition::Metadata::setDefault()
2020-01-06 20:38:32 +00:00
{
data |= KEY_METADATA_IS_DEFAULT_MASK;
}
bool CachePartition::Index::inMemory() const
{
return (index & KEY_IN_MEMORY) == KEY_IN_MEMORY;
}
bool CachePartition::Index::exists() const
{
2020-01-08 17:52:13 +00:00
return index != NOT_EXISTS;
2020-01-06 20:38:32 +00:00
}
void CachePartition::Index::setNotExists()
{
2020-01-08 17:52:13 +00:00
index = NOT_EXISTS;
2020-01-06 20:38:32 +00:00
}
void CachePartition::Index::setInMemory(const bool in_memory)
{
index = (index & ~KEY_IN_MEMORY) | (static_cast<size_t>(in_memory) << KEY_IN_MEMORY_BIT);
}
size_t CachePartition::Index::getAddressInBlock() const
{
return index & INDEX_IN_BLOCK_MASK;
}
void CachePartition::Index::setAddressInBlock(const size_t address_in_block)
{
index = (index & ~INDEX_IN_BLOCK_MASK) | address_in_block;
}
size_t CachePartition::Index::getBlockId() const
{
return (index & BLOCK_INDEX_MASK) >> INDEX_IN_BLOCK_BITS;
}
void CachePartition::Index::setBlockId(const size_t block_id)
{
index = (index & ~BLOCK_INDEX_MASK) | (block_id << INDEX_IN_BLOCK_BITS);
}
2020-01-05 13:59:49 +00:00
CachePartition::CachePartition(
2020-01-12 14:23:32 +00:00
const AttributeUnderlyingType & /* key_structure */,
const std::vector<AttributeUnderlyingType> & attributes_structure_,
const std::string & dir_path,
const size_t file_id_,
const size_t max_size_,
const size_t block_size_,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_,
const size_t write_buffer_size_)
2020-01-12 14:23:32 +00:00
: file_id(file_id_)
, max_size(max_size_)
, block_size(block_size_)
, read_buffer_size(read_buffer_size_)
2020-01-19 08:49:40 +00:00
, write_buffer_size(write_buffer_size_)
2020-01-12 14:23:32 +00:00
, path(dir_path + "/" + std::to_string(file_id))
2020-01-12 11:32:43 +00:00
, attributes_structure(attributes_structure_)
2019-10-25 18:06:08 +00:00
{
2020-01-05 13:59:49 +00:00
keys_buffer.type = AttributeUnderlyingType::utUInt64;
2020-01-10 19:19:03 +00:00
keys_buffer.values = PaddedPODArray<UInt64>();
2020-01-06 20:38:32 +00:00
2020-01-12 12:29:42 +00:00
Poco::File directory(dir_path);
if (!directory.exists())
directory.createDirectory();
2020-01-06 20:38:32 +00:00
{
ProfileEvents::increment(ProfileEvents::FileOpen);
const std::string filename = path + BIN_FILE_EXT;
2020-01-07 17:55:32 +00:00
fd = ::open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_DIRECT, 0666);
if (fd == -1)
2020-01-06 20:38:32 +00:00
{
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
}
2020-01-11 07:20:48 +00:00
2020-01-12 14:23:32 +00:00
if (preallocateDiskSpace(fd, max_size * block_size) < 0)
2020-01-11 07:20:48 +00:00
{
throwFromErrnoWithPath("Cannot preallocate space for the file " + filename, filename, ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
2020-01-06 20:38:32 +00:00
}
}
2020-01-07 11:26:52 +00:00
CachePartition::~CachePartition()
{
2020-01-09 19:34:03 +00:00
std::unique_lock lock(rw_lock);
2020-01-07 17:55:32 +00:00
::close(fd);
2020-01-03 19:52:07 +00:00
}
2019-10-25 18:06:08 +00:00
2020-01-11 16:38:43 +00:00
size_t CachePartition::appendBlock(
const Attribute & new_keys, const Attributes & new_attributes, const PaddedPODArray<Metadata> & metadata, const size_t begin)
2020-01-03 19:52:07 +00:00
{
2020-01-09 19:34:03 +00:00
std::unique_lock lock(rw_lock);
2020-01-12 14:23:32 +00:00
if (current_file_block_id >= max_size)
2020-01-11 11:19:12 +00:00
return 0;
2020-01-08 17:52:13 +00:00
if (new_attributes.size() != attributes_structure.size())
2020-01-03 19:52:07 +00:00
throw Exception{"Wrong columns number in block.", ErrorCodes::BAD_ARGUMENTS};
2020-01-05 13:59:49 +00:00
const auto & ids = std::get<Attribute::Container<UInt64>>(new_keys.values);
2020-01-07 19:18:24 +00:00
auto & ids_buffer = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
2019-10-25 18:06:08 +00:00
2020-01-12 11:32:43 +00:00
if (!memory)
2020-01-12 14:23:32 +00:00
memory.emplace(block_size, BUFFER_ALIGNMENT);
2020-01-07 11:26:52 +00:00
if (!write_buffer)
2020-01-18 17:46:00 +00:00
{
2020-01-19 08:49:40 +00:00
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
2020-01-18 17:46:00 +00:00
// codec = CompressionCodecFactory::instance().get("NONE", std::nullopt);
// compressed_buffer.emplace(*write_buffer, codec);
// hashing_buffer.emplace(*compressed_buffer);
}
2019-10-25 18:06:08 +00:00
2020-01-11 16:38:43 +00:00
for (size_t index = begin; index < ids.size();)
2020-01-06 20:38:32 +00:00
{
2020-01-11 16:38:43 +00:00
IndexAndMetadata index_and_metadata;
2020-01-08 12:40:29 +00:00
index_and_metadata.index.setInMemory(true);
index_and_metadata.index.setBlockId(current_memory_block_id);
index_and_metadata.index.setAddressInBlock(write_buffer->offset());
index_and_metadata.metadata = metadata[index];
2020-01-07 11:26:52 +00:00
2020-01-07 19:18:24 +00:00
bool flushed = false;
2020-01-07 11:26:52 +00:00
for (const auto & attribute : new_attributes)
{
// TODO:: переделать через столбцы + getDataAt
2020-01-07 14:59:03 +00:00
switch (attribute.type)
{
2020-01-07 11:26:52 +00:00
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
if (sizeof(TYPE) > write_buffer->available()) \
{ \
2020-01-19 08:49:40 +00:00
write_buffer.reset(); \
if (++current_memory_block_id == write_buffer_size) \
flush(); \
2020-01-07 19:18:24 +00:00
flushed = true; \
2020-01-07 11:26:52 +00:00
continue; \
} \
else \
{ \
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); \
writeBinary(values[index], *write_buffer); \
} \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
}
2020-01-07 14:59:03 +00:00
2020-01-07 19:18:24 +00:00
if (!flushed)
{
2020-01-11 16:38:43 +00:00
key_to_index_and_metadata[ids[index]] = index_and_metadata;
2020-01-07 19:18:24 +00:00
ids_buffer.push_back(ids[index]);
++index;
}
2020-01-19 08:49:40 +00:00
else if (current_file_block_id < max_size) // next block in write buffer or flushed to ssd
2020-01-12 11:32:43 +00:00
{
2020-01-19 08:49:40 +00:00
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
2020-01-12 11:32:43 +00:00
}
2020-01-19 08:49:40 +00:00
else // flushed to ssd, end of current file
2020-01-12 11:32:43 +00:00
{
memory.reset();
2020-01-19 08:49:40 +00:00
return index - begin;
2020-01-12 11:32:43 +00:00
}
2020-01-06 20:38:32 +00:00
}
2020-01-11 16:38:43 +00:00
return ids.size() - begin;
2019-10-25 18:06:08 +00:00
}
2020-01-01 17:40:46 +00:00
void CachePartition::flush()
2019-10-25 18:06:08 +00:00
{
2020-01-05 13:59:49 +00:00
const auto & ids = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
2020-01-06 20:38:32 +00:00
if (ids.empty())
return;
2020-01-11 20:23:51 +00:00
Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!! " + std::to_string(file_id) + " block: " + std::to_string(current_file_block_id));
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
AIOContext aio_context{1};
2019-10-25 18:06:08 +00:00
2020-01-07 17:55:32 +00:00
iocb write_request{};
2020-01-07 14:59:03 +00:00
iocb * write_request_ptr{&write_request};
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
#if defined(__FreeBSD__)
write_request.aio.aio_lio_opcode = LIO_WRITE;
write_request.aio.aio_fildes = fd;
2020-01-12 11:32:43 +00:00
write_request.aio.aio_buf = reinterpret_cast<volatile void *>(memory->data());
2020-01-12 14:23:32 +00:00
write_request.aio.aio_nbytes = block_size;
write_request.aio.aio_offset = block_size * current_file_block_id;
2020-01-07 14:59:03 +00:00
#else
write_request.aio_lio_opcode = IOCB_CMD_PWRITE;
2020-01-07 17:55:32 +00:00
write_request.aio_fildes = fd;
2020-01-12 11:32:43 +00:00
write_request.aio_buf = reinterpret_cast<UInt64>(memory->data());
2020-01-19 08:49:40 +00:00
write_request.aio_nbytes = block_size * write_buffer_size;
2020-01-12 14:23:32 +00:00
write_request.aio_offset = block_size * current_file_block_id;
2020-01-07 14:59:03 +00:00
#endif
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
Poco::Logger::get("try:").information("offset: " + std::to_string(write_request.aio_offset) + " nbytes: " + std::to_string(write_request.aio_nbytes));
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
while (io_submit(aio_context.ctx, 1, &write_request_ptr) < 0)
{
if (errno != EINTR)
throw Exception("Cannot submit request for asynchronous IO on file " + path + BIN_FILE_EXT, ErrorCodes::CANNOT_IO_SUBMIT);
}
2020-01-04 15:04:16 +00:00
2020-01-07 14:59:03 +00:00
CurrentMetrics::Increment metric_increment_write{CurrentMetrics::Write};
io_event event;
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) < 0)
{
if (errno != EINTR)
throw Exception("Failed to wait for asynchronous IO completion on file " + path + BIN_FILE_EXT, ErrorCodes::CANNOT_IO_GETEVENTS);
2019-10-25 18:06:08 +00:00
}
2020-01-07 14:59:03 +00:00
// Unpoison the memory returned from an uninstrumented system function.
__msan_unpoison(&event, sizeof(event));
ssize_t bytes_written;
#if defined(__FreeBSD__)
bytes_written = aio_return(reinterpret_cast<struct aiocb *>(event.udata));
#else
bytes_written = event.res;
#endif
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
if (bytes_written != static_cast<decltype(bytes_written)>(write_request.aio_nbytes))
throw Exception("Not all data was written for asynchronous IO on file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(bytes_written), ErrorCodes::AIO_WRITE_ERROR);
2020-01-07 17:55:32 +00:00
if (::fsync(fd) < 0)
throwFromErrnoWithPath("Cannot fsync " + path + BIN_FILE_EXT, path + BIN_FILE_EXT, ErrorCodes::CANNOT_FSYNC);
2019-10-25 18:06:08 +00:00
2020-01-04 15:04:16 +00:00
/// commit changes in index
for (size_t row = 0; row < ids.size(); ++row)
2020-01-06 20:38:32 +00:00
{
2020-01-19 08:49:40 +00:00
auto & index = key_to_index_and_metadata[ids[row]].index;
2020-01-19 10:54:36 +00:00
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
2020-01-19 08:49:40 +00:00
{
index.setInMemory(false);
index.setBlockId(current_file_block_id + index.getBlockId());
}
2020-01-06 20:38:32 +00:00
}
2020-01-04 15:04:16 +00:00
2020-01-19 08:49:40 +00:00
current_file_block_id += write_buffer_size;
current_memory_block_id = 0;
2020-01-07 14:59:03 +00:00
2020-01-04 15:04:16 +00:00
/// clear buffer
2020-01-05 13:59:49 +00:00
std::visit([](auto & attr) { attr.clear(); }, keys_buffer.values);
2019-10-25 18:06:08 +00:00
}
2020-01-11 11:19:12 +00:00
template <typename Out>
2020-01-05 20:31:25 +00:00
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
2020-01-11 11:19:12 +00:00
ResultArrayType<Out> & out, std::vector<bool> & found,
2020-01-08 12:40:29 +00:00
std::chrono::system_clock::time_point now) const
2020-01-03 19:52:07 +00:00
{
2020-01-09 19:34:03 +00:00
std::shared_lock lock(rw_lock);
2020-01-06 20:38:32 +00:00
PaddedPODArray<Index> indices(ids.size());
2020-01-05 17:05:49 +00:00
for (size_t i = 0; i < ids.size(); ++i)
{
2020-01-11 11:19:12 +00:00
if (found[i])
2020-01-08 12:40:29 +00:00
{
indices[i].setNotExists();
}
2020-01-11 11:19:12 +00:00
else if (auto it = key_to_index_and_metadata.find(ids[i]);
it != std::end(key_to_index_and_metadata) && it->second.metadata.expiresAt() > now)
2020-01-05 20:31:25 +00:00
{
2020-01-11 11:19:12 +00:00
indices[i] = it->second.index;
found[i] = true;
2020-01-05 20:31:25 +00:00
}
else
{
2020-01-11 11:19:12 +00:00
indices[i].setNotExists();
2020-01-05 20:31:25 +00:00
}
2020-01-05 17:05:49 +00:00
}
2020-01-07 19:18:24 +00:00
getValueFromMemory<Out>(attribute_index, indices, out);
getValueFromStorage<Out>(attribute_index, indices, out);
2020-01-03 19:52:07 +00:00
}
2020-01-05 20:31:25 +00:00
template <typename Out>
void CachePartition::getValueFromMemory(
2020-01-06 20:38:32 +00:00
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
2020-01-03 19:52:07 +00:00
{
2020-01-05 20:31:25 +00:00
for (size_t i = 0; i < indices.size(); ++i)
{
const auto & index = indices[i];
2020-01-06 20:38:32 +00:00
if (index.exists() && index.inMemory())
2020-01-05 20:31:25 +00:00
{
2020-01-19 08:49:40 +00:00
const size_t offset = index.getBlockId() * block_size + index.getAddressInBlock();
2020-01-07 11:26:52 +00:00
2020-01-19 08:49:40 +00:00
ReadBufferFromMemory read_buffer(memory->data() + offset, block_size * write_buffer_size - offset);
2020-01-07 11:26:52 +00:00
readValueFromBuffer(attribute_index, out[i], read_buffer);
2020-01-05 20:31:25 +00:00
}
}
}
template <typename Out>
void CachePartition::getValueFromStorage(
2020-01-06 20:38:32 +00:00
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
2020-01-05 20:31:25 +00:00
{
2020-01-07 17:55:32 +00:00
std::vector<std::pair<Index, size_t>> index_to_out;
2020-01-06 20:38:32 +00:00
for (size_t i = 0; i < indices.size(); ++i)
{
const auto & index = indices[i];
if (index.exists() && !index.inMemory())
2020-01-07 17:55:32 +00:00
index_to_out.emplace_back(index, i);
2020-01-06 20:38:32 +00:00
}
if (index_to_out.empty())
return;
2020-01-07 17:55:32 +00:00
/// sort by (block_id, offset_in_block)
2020-01-06 20:38:32 +00:00
std::sort(std::begin(index_to_out), std::end(index_to_out));
2020-01-18 13:21:07 +00:00
Memory read_buffer(block_size * read_buffer_size, BUFFER_ALIGNMENT);
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
std::vector<iocb> requests;
std::vector<iocb*> pointers;
std::vector<std::vector<size_t>> blocks_to_indices;
requests.reserve(index_to_out.size());
pointers.reserve(index_to_out.size());
blocks_to_indices.reserve(index_to_out.size());
2020-01-06 20:38:32 +00:00
for (size_t i = 0; i < index_to_out.size(); ++i)
{
2020-01-07 17:55:32 +00:00
if (!requests.empty() &&
2020-01-12 14:23:32 +00:00
static_cast<size_t>(requests.back().aio_offset) == index_to_out[i].first.getBlockId() * block_size)
2020-01-07 17:55:32 +00:00
{
blocks_to_indices.back().push_back(i);
continue;
}
iocb request{};
2020-01-06 20:38:32 +00:00
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
2020-01-07 17:55:32 +00:00
request.aio.aio_fildes = fd;
request.aio.aio_buf = reinterpret_cast<volatile void *>(
2020-01-08 17:10:37 +00:00
reinterpret_cast<UInt64>(read_buffer.data()) + SSD_BLOCK_SIZE * (requests.size() % READ_BUFFER_SIZE_BLOCKS));
2020-01-07 17:55:32 +00:00
request.aio.aio_nbytes = SSD_BLOCK_SIZE;
2020-01-06 20:38:32 +00:00
request.aio.aio_offset = index_to_out[i].first;
2020-01-08 12:40:29 +00:00
request.aio_data = requests.size();
2020-01-06 20:38:32 +00:00
#else
2020-01-07 17:55:32 +00:00
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
2020-01-12 14:23:32 +00:00
request.aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + block_size * (requests.size() % read_buffer_size);
request.aio_nbytes = block_size;
request.aio_offset = index_to_out[i].first.getBlockId() * block_size;
2020-01-07 19:18:24 +00:00
request.aio_data = requests.size();
2020-01-06 20:38:32 +00:00
#endif
2020-01-07 17:55:32 +00:00
requests.push_back(request);
pointers.push_back(&requests.back());
blocks_to_indices.emplace_back();
blocks_to_indices.back().push_back(i);
2020-01-06 20:38:32 +00:00
}
2020-01-12 14:23:32 +00:00
AIOContext aio_context(read_buffer_size);
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
std::vector<bool> processed(requests.size(), false);
std::vector<io_event> events(requests.size());
2020-01-19 10:54:36 +00:00
for (auto & event : events)
event.res = -1; // TODO: remove
2020-01-06 20:38:32 +00:00
2020-01-07 17:55:32 +00:00
size_t to_push = 0;
size_t to_pop = 0;
while (to_pop < requests.size())
2020-01-06 20:38:32 +00:00
{
2020-01-07 17:55:32 +00:00
/// get io tasks from previous iteration
size_t popped = 0;
while (to_pop < to_push && (popped = io_getevents(aio_context.ctx, to_push - to_pop, to_push - to_pop, &events[to_pop], nullptr)) < 0)
2020-01-06 20:38:32 +00:00
{
if (errno != EINTR)
2020-01-19 10:54:36 +00:00
throwFromErrno("io_getevents: Failed to get an event for asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
2020-01-06 20:38:32 +00:00
}
2020-01-07 17:55:32 +00:00
for (size_t i = to_pop; i < to_pop + popped; ++i)
2020-01-06 20:38:32 +00:00
{
2020-01-07 17:55:32 +00:00
const auto request_id = events[i].data;
const auto & request = requests[request_id];
if (events[i].res != static_cast<ssize_t>(request.aio_nbytes))
2020-01-18 11:47:58 +00:00
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"request_id= " + std::to_string(request.aio_data) + ", aio_nbytes=" + std::to_string(request.aio_nbytes) + ", aio_offset=" + std::to_string(request.aio_offset) +
"returned: " + std::to_string(events[i].res), ErrorCodes::AIO_WRITE_ERROR);
2020-01-07 17:55:32 +00:00
for (const size_t idx : blocks_to_indices[request_id])
{
const auto & [file_index, out_index] = index_to_out[idx];
2020-01-18 13:21:07 +00:00
ReadBufferFromMemory buf(
2020-01-07 17:55:32 +00:00
reinterpret_cast<char *>(request.aio_buf) + file_index.getAddressInBlock(),
2020-01-12 14:23:32 +00:00
block_size - file_index.getAddressInBlock());
2020-01-07 17:55:32 +00:00
readValueFromBuffer(attribute_index, out[out_index], buf);
}
processed[request_id] = true;
2020-01-06 20:38:32 +00:00
}
2020-01-07 17:55:32 +00:00
while (to_pop < requests.size() && processed[to_pop])
++to_pop;
2020-01-07 11:26:52 +00:00
2020-01-07 17:55:32 +00:00
/// add new io tasks
2020-01-12 14:23:32 +00:00
const size_t new_tasks_count = std::min(read_buffer_size - (to_push - to_pop), requests.size() - to_push);
2020-01-07 17:55:32 +00:00
size_t pushed = 0;
while (new_tasks_count > 0 && (pushed = io_submit(aio_context.ctx, new_tasks_count, &pointers[to_push])) < 0)
{
if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
to_push += pushed;
2020-01-07 11:26:52 +00:00
}
}
2020-01-06 20:38:32 +00:00
2020-01-07 11:26:52 +00:00
template <typename Out>
void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst, ReadBuffer & buf) const
{
for (size_t i = 0; i < attribute_index; ++i)
{
2020-01-08 17:52:13 +00:00
switch (attributes_structure[i])
2020-01-06 20:38:32 +00:00
{
2020-01-07 11:26:52 +00:00
#define DISPATCH(TYPE) \
2020-01-06 20:38:32 +00:00
case AttributeUnderlyingType::ut##TYPE: \
{ \
2020-01-07 11:26:52 +00:00
buf.ignore(sizeof(TYPE)); \
2020-01-06 20:38:32 +00:00
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
}
2020-01-07 11:26:52 +00:00
2020-01-08 17:52:13 +00:00
switch (attributes_structure[attribute_index])
2020-01-07 11:26:52 +00:00
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
readBinary(dst, buf); \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
2020-01-03 19:52:07 +00:00
}
2020-01-11 11:19:12 +00:00
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out, std::chrono::system_clock::time_point now) const
2020-01-05 20:31:25 +00:00
{
2020-01-09 19:34:03 +00:00
std::shared_lock lock(rw_lock);
2020-01-05 20:31:25 +00:00
for (size_t i = 0; i < ids.size(); ++i)
2020-01-06 20:38:32 +00:00
{
2020-01-08 12:40:29 +00:00
auto it = key_to_index_and_metadata.find(ids[i]);
2020-01-11 11:19:12 +00:00
if (it == std::end(key_to_index_and_metadata) || it->second.metadata.expiresAt() <= now)
2020-01-08 12:40:29 +00:00
{
2020-01-11 20:23:51 +00:00
out[i] = HAS_NOT_FOUND;
2020-01-06 20:38:32 +00:00
}
else
{
2020-01-08 12:40:29 +00:00
out[i] = !it->second.metadata.isDefault();
2020-01-06 20:38:32 +00:00
}
}
2020-01-05 20:31:25 +00:00
}
2020-01-11 16:38:43 +00:00
size_t CachePartition::getId() const
{
return file_id;
}
2020-01-18 17:46:00 +00:00
double CachePartition::getLoadFactor() const
{
std::shared_lock lock(rw_lock);
return static_cast<double>(current_file_block_id) / max_size;
}
size_t CachePartition::getElementCount() const
{
std::shared_lock lock(rw_lock);
return key_to_index_and_metadata.size();
}
2020-01-18 11:47:58 +00:00
PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chrono::system_clock::time_point now) const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
PaddedPODArray<Key> array;
for (const auto & [key, index_and_metadata] : key_to_index_and_metadata)
if (!index_and_metadata.metadata.isDefault() && index_and_metadata.metadata.expiresAt() > now)
array.push_back(key);
return array;
}
2020-01-12 11:32:43 +00:00
void CachePartition::remove()
{
std::unique_lock lock(rw_lock);
2020-01-19 08:49:40 +00:00
//Poco::File(path + BIN_FILE_EXT).remove();
2020-01-12 12:29:42 +00:00
//std::filesystem::remove(std::filesystem::path(path + BIN_FILE_EXT));
2020-01-12 11:32:43 +00:00
}
2020-01-08 19:41:05 +00:00
CacheStorage::CacheStorage(
2020-01-12 14:23:32 +00:00
const AttributeTypes & attributes_structure_,
const std::string & path_,
const size_t max_partitions_count_,
const size_t partition_size_,
const size_t block_size_,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_,
const size_t write_buffer_size_)
2020-01-08 19:41:05 +00:00
: attributes_structure(attributes_structure_)
2020-01-04 15:04:16 +00:00
, path(path_)
2020-01-12 11:32:43 +00:00
, max_partitions_count(max_partitions_count_)
2020-01-12 14:23:32 +00:00
, partition_size(partition_size_)
, block_size(block_size_)
, read_buffer_size(read_buffer_size_)
2020-01-19 08:49:40 +00:00
, write_buffer_size(write_buffer_size_)
2020-01-04 15:04:16 +00:00
, log(&Poco::Logger::get("CacheStorage"))
{
}
2020-01-12 12:29:42 +00:00
CacheStorage::~CacheStorage()
{
std::unique_lock lock(rw_lock);
partition_delete_queue.splice(std::end(partition_delete_queue), partitions);
collectGarbage();
}
2020-01-11 20:23:51 +00:00
template <typename Out>
void CacheStorage::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
{
std::vector<bool> found(ids.size(), false);
2020-01-18 17:46:00 +00:00
{
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
partition->getValue<Out>(attribute_index, ids, out, found, now);
2020-01-11 20:23:51 +00:00
2020-01-18 17:46:00 +00:00
for (size_t i = 0; i < ids.size(); ++i)
if (!found[i])
not_found[ids[i]].push_back(i);
}
query_count.fetch_add(ids.size(), std::memory_order_relaxed);
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
2020-01-11 20:23:51 +00:00
}
void CacheStorage::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const
{
2020-01-18 17:46:00 +00:00
{
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
partition->has(ids, out, now);
2020-01-11 20:23:51 +00:00
2020-01-18 17:46:00 +00:00
for (size_t i = 0; i < ids.size(); ++i)
if (out[i] == HAS_NOT_FOUND)
not_found[ids[i]].push_back(i);
}
query_count.fetch_add(ids.size(), std::memory_order_relaxed);
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
2020-01-11 20:23:51 +00:00
}
2020-01-02 19:33:19 +00:00
template <typename PresentIdHandler, typename AbsentIdHandler>
2020-01-04 15:04:16 +00:00
void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
2020-01-08 19:41:05 +00:00
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values)
2020-01-02 19:33:19 +00:00
{
2020-01-11 16:38:43 +00:00
auto append_block = [this](const CachePartition::Attribute & new_keys,
const CachePartition::Attributes & new_attributes, const PaddedPODArray<CachePartition::Metadata> & metadata)
{
size_t inserted = 0;
while (inserted < metadata.size())
{
if (!partitions.empty())
inserted += partitions.front()->appendBlock(new_keys, new_attributes, metadata, inserted);
if (inserted < metadata.size())
{
partitions.emplace_front(std::make_unique<CachePartition>(
AttributeUnderlyingType::utUInt64, attributes_structure, path,
2020-01-12 14:23:32 +00:00
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
2020-01-19 08:49:40 +00:00
partition_size, block_size, read_buffer_size, write_buffer_size));
2020-01-11 16:38:43 +00:00
}
}
2020-01-12 11:32:43 +00:00
collectGarbage();
2020-01-11 16:38:43 +00:00
};
2020-01-02 19:33:19 +00:00
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({id, 0});
const auto now = std::chrono::system_clock::now();
{
2020-01-11 20:56:27 +00:00
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
2020-01-02 19:33:19 +00:00
{
2020-01-11 20:56:27 +00:00
try
2020-01-02 19:33:19 +00:00
{
2020-01-11 20:56:27 +00:00
if (update_error_count)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
while (const auto block = stream->read())
{
const auto new_keys = std::move(createAttributesFromBlock(block, 0, { AttributeUnderlyingType::utUInt64 }).front());
const auto new_attributes = createAttributesFromBlock(block, 1, attributes_structure);
2020-01-06 20:38:32 +00:00
2020-01-11 20:56:27 +00:00
const auto & ids = std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values);
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
PaddedPODArray<CachePartition::Metadata> metadata(ids.size());
2020-01-08 12:40:29 +00:00
2020-01-11 20:56:27 +00:00
for (const auto i : ext::range(0, ids.size()))
{
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata[i].setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
/// mark corresponding id as found
on_updated(ids[i], i, new_attributes);
remaining_ids[ids[i]] = 1;
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
append_block(new_keys, new_attributes, metadata);
}
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
stream->readSuffix();
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
update_error_count = 0;
last_update_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
++update_error_count;
last_update_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, update_error_count));
2020-01-02 19:33:19 +00:00
2020-01-11 20:56:27 +00:00
tryLogException(last_update_exception, log,
"Could not update ssd cache dictionary, next update is scheduled at " + ext::to_string(backoff_end_time));
}
2020-01-02 19:33:19 +00:00
}
}
size_t not_found_num = 0, found_num = 0;
/// Check which ids have not been found and require setting null_value
2020-01-05 13:59:49 +00:00
CachePartition::Attribute new_keys;
new_keys.type = AttributeUnderlyingType::utUInt64;
2020-01-10 19:19:03 +00:00
new_keys.values = PaddedPODArray<UInt64>();
2020-01-03 19:52:07 +00:00
CachePartition::Attributes new_attributes;
{
/// TODO: create attributes from structure
2020-01-08 19:41:05 +00:00
for (const auto & attribute_type : attributes_structure)
2020-01-03 19:52:07 +00:00
{
2020-01-08 19:41:05 +00:00
switch (attribute_type)
2020-01-03 19:52:07 +00:00
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
2020-01-04 15:04:16 +00:00
new_attributes.emplace_back(); \
2020-01-08 19:41:05 +00:00
new_attributes.back().type = attribute_type; \
2020-01-10 19:19:03 +00:00
new_attributes.back().values = PaddedPODArray<TYPE>(); \
2020-01-03 19:52:07 +00:00
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
}
}
2020-01-08 12:40:29 +00:00
2020-01-10 19:19:03 +00:00
PaddedPODArray<CachePartition::Metadata> metadata;
2020-01-08 12:40:29 +00:00
2020-01-02 19:33:19 +00:00
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
++found_num;
continue;
}
++not_found_num;
const auto id = id_found_pair.first;
if (update_error_count)
{
/// TODO: юзать старые значения.
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_update_exception);
}
2020-01-05 13:59:49 +00:00
// Set key
2020-01-10 19:19:03 +00:00
std::get<PaddedPODArray<UInt64>>(new_keys.values).push_back(id);
2020-01-05 13:59:49 +00:00
2020-01-08 19:41:05 +00:00
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
2020-01-08 12:40:29 +00:00
metadata.emplace_back();
metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
metadata.back().setDefault();
2020-01-02 19:33:19 +00:00
/// Set null_value for each attribute
2020-01-08 19:41:05 +00:00
for (size_t i = 0; i < attributes_structure.size(); ++i)
2020-01-02 19:33:19 +00:00
{
2020-01-08 19:41:05 +00:00
const auto & attribute = attributes_structure[i];
2020-01-04 15:04:16 +00:00
// append null
2020-01-08 19:41:05 +00:00
switch (attribute)
2020-01-04 15:04:16 +00:00
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
2020-01-10 19:19:03 +00:00
auto & to_values = std::get<PaddedPODArray<TYPE>>(new_attributes[i].values); \
2020-01-08 19:41:05 +00:00
auto & null_value = std::get<TYPE>(null_values[i]); \
2020-01-04 15:04:16 +00:00
to_values.push_back(null_value); \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
2020-01-02 19:33:19 +00:00
}
/// inform caller that the cell has not been found
on_id_not_found(id);
}
2020-01-08 12:40:29 +00:00
2020-01-11 20:56:27 +00:00
{
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (not_found_num)
append_block(new_keys, new_attributes, metadata);
}
2020-01-02 19:33:19 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
2020-01-18 11:47:58 +00:00
PaddedPODArray<CachePartition::Key> CacheStorage::getCachedIds() const
{
PaddedPODArray<Key> array;
const auto now = std::chrono::system_clock::now();
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
{
const auto cached_in_partition = partition->getCachedIds(now);
array.insert(std::begin(cached_in_partition), std::end(cached_in_partition));
}
return array;
}
2020-01-18 17:46:00 +00:00
double CacheStorage::getLoadFactor() const
{
double result = 0;
std::shared_lock lock(rw_lock);
for (const auto & partition : partitions)
result += partition->getLoadFactor();
return result / partitions.size();
}
size_t CacheStorage::getElementCount() const
{
size_t result = 0;
std::shared_lock lock(rw_lock);
for (const auto & partition : partitions)
result += partition->getElementCount();
return result;
}
2020-01-12 11:32:43 +00:00
void CacheStorage::collectGarbage()
{
// add partitions to queue
2020-01-12 12:29:42 +00:00
while (partitions.size() > max_partitions_count)
2020-01-12 11:32:43 +00:00
{
2020-01-12 12:29:42 +00:00
partition_delete_queue.splice(std::end(partition_delete_queue), partitions, std::prev(std::end(partitions)));
2020-01-12 11:32:43 +00:00
}
// drop unused partitions
while (!partition_delete_queue.empty() && partition_delete_queue.front().use_count() == 1)
{
partition_delete_queue.front()->remove();
partition_delete_queue.pop_front();
}
}
2020-01-05 17:05:49 +00:00
CachePartition::Attributes CacheStorage::createAttributesFromBlock(
2020-01-06 20:38:32 +00:00
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure)
2020-01-03 19:52:07 +00:00
{
CachePartition::Attributes attributes;
const auto columns = block.getColumns();
2020-01-05 17:05:49 +00:00
for (size_t i = 0; i < structure.size(); ++i)
2020-01-03 19:52:07 +00:00
{
2020-01-06 20:38:32 +00:00
const auto & column = columns[i + begin_column];
2020-01-05 17:05:49 +00:00
switch (structure[i])
2020-01-03 19:52:07 +00:00
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
2020-01-10 19:19:03 +00:00
PaddedPODArray<TYPE> values(column->size()); \
2020-01-03 19:52:07 +00:00
const auto raw_data = column->getRawData(); \
2020-01-06 20:38:32 +00:00
memcpy(&values[0], raw_data.data, raw_data.size * sizeof(TYPE)); \
2020-01-04 15:04:16 +00:00
attributes.emplace_back(); \
2020-01-05 17:05:49 +00:00
attributes.back().type = structure[i]; \
2020-01-04 15:04:16 +00:00
attributes.back().values = std::move(values); \
2020-01-03 19:52:07 +00:00
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
}
return attributes;
}
2020-01-01 17:40:46 +00:00
SSDCacheDictionary::SSDCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
2020-01-03 19:52:07 +00:00
const std::string & path_,
2020-01-12 14:23:32 +00:00
const size_t max_partitions_count_,
const size_t partition_size_,
const size_t block_size_,
2020-01-19 08:49:40 +00:00
const size_t read_buffer_size_,
const size_t write_buffer_size_)
2020-01-01 17:40:46 +00:00
: name(name_)
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, dict_lifetime(dict_lifetime_)
2020-01-03 19:52:07 +00:00
, path(path_)
2020-01-12 14:23:32 +00:00
, max_partitions_count(max_partitions_count_)
, partition_size(partition_size_)
, block_size(block_size_)
, read_buffer_size(read_buffer_size_)
2020-01-19 08:49:40 +00:00
, write_buffer_size(write_buffer_size_)
2020-01-08 19:41:05 +00:00
, storage(ext::map<std::vector>(dict_struct.attributes, [](const auto & attribute) { return attribute.underlying_type; }),
2020-01-19 08:49:40 +00:00
path, max_partitions_count, partition_size, block_size, read_buffer_size, write_buffer_size)
2020-01-03 19:52:07 +00:00
, log(&Poco::Logger::get("SSDCacheDictionary"))
2020-01-01 17:40:46 +00:00
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
}
#define DECLARE(TYPE) \
void SSDCacheDictionary::get##TYPE( \
const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const \
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
2020-01-08 19:41:05 +00:00
const auto null_value = std::get<TYPE>(null_values[index]); \
2020-01-01 17:40:46 +00:00
getItemsNumberImpl<TYPE, TYPE>( \
2020-01-05 13:59:49 +00:00
index, \
2020-01-04 15:04:16 +00:00
ids, \
out, \
[&](const size_t) { return null_value; }); \
2020-01-01 17:40:46 +00:00
}
2020-01-05 13:59:49 +00:00
2020-01-01 17:40:46 +00:00
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
#define DECLARE(TYPE) \
void SSDCacheDictionary::get##TYPE( \
const std::string & attribute_name, \
const PaddedPODArray<Key> & ids, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const \
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
getItemsNumberImpl<TYPE, TYPE>( \
2020-01-05 13:59:49 +00:00
index, \
2020-01-01 17:40:46 +00:00
ids, \
out, \
[&](const size_t row) { return def[row]; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
#define DECLARE(TYPE) \
void SSDCacheDictionary::get##TYPE( \
const std::string & attribute_name, \
const PaddedPODArray<Key> & ids, \
const TYPE def, \
ResultArrayType<TYPE> & out) const \
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
getItemsNumberImpl<TYPE, TYPE>( \
2020-01-05 13:59:49 +00:00
index, \
2020-01-01 17:40:46 +00:00
ids, \
out, \
[&](const size_t) { return def; }); \
}
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void SSDCacheDictionary::getItemsNumberImpl(
2020-01-05 13:59:49 +00:00
const size_t attribute_index, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
2020-01-01 17:40:46 +00:00
{
2020-01-08 12:40:29 +00:00
const auto now = std::chrono::system_clock::now();
2020-01-01 17:40:46 +00:00
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
2020-01-08 12:40:29 +00:00
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids, now);
2020-01-01 17:40:46 +00:00
if (not_found_ids.empty())
return;
std::vector<Key> required_ids(not_found_ids.size());
2020-01-05 13:59:49 +00:00
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](const auto & pair) { return pair.first; });
2020-01-01 17:40:46 +00:00
2020-01-02 19:33:19 +00:00
storage.update(
source_ptr,
2020-01-01 17:40:46 +00:00
required_ids,
2020-01-04 15:04:16 +00:00
[&](const auto id, const auto row, const auto & new_attributes) {
2020-01-03 19:52:07 +00:00
for (const size_t out_row : not_found_ids[id])
2020-01-10 19:19:03 +00:00
out[out_row] = std::get<PaddedPODArray<OutputType>>(new_attributes[attribute_index].values)[row];
2020-01-01 17:40:46 +00:00
},
2020-01-05 13:59:49 +00:00
[&](const size_t id)
2020-01-01 17:40:46 +00:00
{
for (const size_t row : not_found_ids[id])
out[row] = get_default(row);
2020-01-08 19:41:05 +00:00
},
getLifetime(),
null_values);
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex(attribute_name);
2020-01-08 19:41:05 +00:00
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
2020-01-01 17:40:46 +00:00
2020-01-08 19:41:05 +00:00
const auto null_value = StringRef{std::get<String>(null_values[index])};
2020-01-01 17:40:46 +00:00
2020-01-05 13:59:49 +00:00
getItemsString(index, ids, out, [&](const size_t) { return null_value; });
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex(attribute_name);
2020-01-08 19:41:05 +00:00
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
2020-01-01 17:40:46 +00:00
2020-01-05 13:59:49 +00:00
getItemsString(index, ids, out, [&](const size_t row) { return def->getDataAt(row); });
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary::getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
{
2020-01-05 13:59:49 +00:00
const auto index = getAttributeIndex(attribute_name);
2020-01-08 19:41:05 +00:00
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
2020-01-01 17:40:46 +00:00
2020-01-05 13:59:49 +00:00
getItemsString(index, ids, out, [&](const size_t) { return StringRef{def}; });
2020-01-01 17:40:46 +00:00
}
template <typename DefaultGetter>
2020-01-05 13:59:49 +00:00
void SSDCacheDictionary::getItemsString(const size_t attribute_index, const PaddedPODArray<Key> & ids,
2020-01-01 17:40:46 +00:00
ColumnString * out, DefaultGetter && get_default) const
{
2020-01-05 13:59:49 +00:00
UNUSED(attribute_index);
2020-01-01 17:40:46 +00:00
UNUSED(ids);
UNUSED(out);
UNUSED(get_default);
}
2020-01-08 12:40:29 +00:00
void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
{
const auto now = std::chrono::system_clock::now();
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
storage.has(ids, out, not_found_ids, now);
if (not_found_ids.empty())
return;
std::vector<Key> required_ids(not_found_ids.size());
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](const auto & pair) { return pair.first; });
storage.update(
source_ptr,
required_ids,
[&](const auto id, const auto, const auto &) {
for (const size_t out_row : not_found_ids[id])
out[out_row] = true;
},
[&](const size_t id)
{
for (const size_t row : not_found_ids[id])
out[row] = false;
2020-01-08 19:41:05 +00:00
},
getLifetime(),
null_values);
2020-01-08 12:40:29 +00:00
}
2020-01-18 11:47:58 +00:00
BlockInputStreamPtr SSDCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
{
using BlockInputStreamType = DictionaryBlockInputStream<SSDCacheDictionary, Key>;
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, storage.getCachedIds(), column_names);
}
2020-01-01 17:40:46 +00:00
size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) const
{
auto it = attribute_index_by_name.find(attr_name);
if (it == std::end(attribute_index_by_name))
throw Exception{"Attribute `" + name + "` does not exist.", ErrorCodes::BAD_ARGUMENTS};
return it->second;
}
template <typename T>
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithTypeImpl(const Field & null_value)
2020-01-01 17:40:46 +00:00
{
2020-01-08 19:41:05 +00:00
AttributeValueVariant var_null_value = static_cast<T>(null_value.get<NearestFieldType<T>>());
2020-01-01 17:40:46 +00:00
bytes_allocated += sizeof(T);
2020-01-08 19:41:05 +00:00
return var_null_value;
2020-01-01 17:40:46 +00:00
}
template <>
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithTypeImpl<String>(const Field & null_value)
2020-01-01 17:40:46 +00:00
{
2020-01-08 19:41:05 +00:00
AttributeValueVariant var_null_value = null_value.get<String>();
2020-01-01 17:40:46 +00:00
bytes_allocated += sizeof(StringRef);
//if (!string_arena)
// string_arena = std::make_unique<ArenaWithFreeLists>();
2020-01-08 19:41:05 +00:00
return var_null_value;
2020-01-01 17:40:46 +00:00
}
2020-01-08 19:41:05 +00:00
AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithType(const AttributeUnderlyingType type, const Field & null_value)
2020-01-01 17:40:46 +00:00
{
switch (type)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
2020-01-08 19:41:05 +00:00
return createAttributeNullValueWithTypeImpl<TYPE>(null_value);
2019-10-25 18:06:08 +00:00
2020-01-01 17:40:46 +00:00
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
DISPATCH(String)
#undef DISPATCH
}
2020-01-04 15:04:16 +00:00
throw Exception{"Unknown attribute type: " + std::to_string(static_cast<int>(type)), ErrorCodes::TYPE_MISMATCH};
2020-01-01 17:40:46 +00:00
}
void SSDCacheDictionary::createAttributes()
{
2020-01-08 19:41:05 +00:00
null_values.reserve(dict_struct.attributes.size());
2020-01-01 17:40:46 +00:00
for (size_t i = 0; i < dict_struct.attributes.size(); ++i)
{
const auto & attribute = dict_struct.attributes[i];
attribute_index_by_name.emplace(attribute.name, i);
2020-01-08 19:41:05 +00:00
null_values.push_back(createAttributeNullValueWithType(attribute.underlying_type, attribute.null_value));
2020-01-01 17:40:46 +00:00
if (attribute.hierarchical)
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
ErrorCodes::TYPE_MISMATCH};
}
}
2019-10-25 18:06:08 +00:00
2020-01-05 13:59:49 +00:00
void registerDictionarySSDCache(DictionaryFactory & factory)
{
auto create_layout = [=](const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
2020-01-12 14:23:32 +00:00
const auto max_partitions_count = config.getInt(layout_prefix + ".ssd.max_partitions_count", DEFAULT_PARTITIONS_COUNT);
if (max_partitions_count <= 0)
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have 0 (or less) max_partitions_count", ErrorCodes::BAD_ARGUMENTS};
const auto block_size = config.getInt(layout_prefix + ".ssd.block_size", DEFAULT_SSD_BLOCK_SIZE);
if (block_size <= 0)
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have 0 (or less) block_size", ErrorCodes::BAD_ARGUMENTS};
const auto partition_size = config.getInt64(layout_prefix + ".ssd.partition_size", DEFAULT_FILE_SIZE);
if (partition_size <= 0)
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have 0 (or less) partition_size", ErrorCodes::BAD_ARGUMENTS};
if (partition_size % block_size != 0)
throw Exception{name + ": partition_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
const auto read_buffer_size = config.getInt64(layout_prefix + ".ssd.read_buffer_size", DEFAULT_READ_BUFFER_SIZE);
if (read_buffer_size <= 0)
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have 0 (or less) read_buffer_size", ErrorCodes::BAD_ARGUMENTS};
if (read_buffer_size % block_size != 0)
throw Exception{name + ": read_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
2020-01-05 13:59:49 +00:00
2020-01-19 08:49:40 +00:00
const auto write_buffer_size = config.getInt64(layout_prefix + ".ssd.write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE);
if (write_buffer_size <= 0)
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have 0 (or less) write_buffer_size", ErrorCodes::BAD_ARGUMENTS};
if (write_buffer_size % block_size != 0)
throw Exception{name + ": write_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
2020-01-05 13:59:49 +00:00
const auto path = config.getString(layout_prefix + ".ssd.path");
if (path.empty())
2020-01-12 14:23:32 +00:00
throw Exception{name + ": dictionary of layout 'ssdcache' cannot have empty path",
2020-01-05 13:59:49 +00:00
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
2020-01-12 14:23:32 +00:00
return std::make_unique<SSDCacheDictionary>(
name, dict_struct, std::move(source_ptr), dict_lifetime, path,
2020-01-19 08:49:40 +00:00
max_partitions_count, partition_size / block_size, block_size,
read_buffer_size / block_size, write_buffer_size / block_size);
2020-01-05 13:59:49 +00:00
};
factory.registerLayout("ssd", create_layout, false);
}
2019-10-25 18:06:08 +00:00
}