mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-04 15:40:49 +00:00
fix review
This commit is contained in:
parent
207de9ca9c
commit
8fef9f451b
@ -10,7 +10,7 @@ namespace DB
|
|||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
inline size_t nearestPowTwo(size_t x)
|
inline size_t roundUpToPowerOfTwoOrZero(size_t x)
|
||||||
{
|
{
|
||||||
size_t r = 8;
|
size_t r = 8;
|
||||||
while (x > r)
|
while (x > r)
|
||||||
@ -29,6 +29,13 @@ struct Int64Hasher
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Class for storing cache index.
|
||||||
|
It consists of two arrays.
|
||||||
|
The first one is splitted into buckets (each stores 8 elements (cells)) determined by hash of the element key.
|
||||||
|
The second one is splitted into 4bit numbers, which are positions in bucket for next element write (So cache uses FIFO eviction algorithm inside each bucket).
|
||||||
|
*/
|
||||||
template <typename K, typename V, typename Hasher, typename Deleter = EmptyDeleter>
|
template <typename K, typename V, typename Hasher, typename Deleter = EmptyDeleter>
|
||||||
class BucketCacheIndex
|
class BucketCacheIndex
|
||||||
{
|
{
|
||||||
@ -41,7 +48,7 @@ class BucketCacheIndex
|
|||||||
public:
|
public:
|
||||||
template <typename = std::enable_if<std::is_same_v<EmptyDeleter, Deleter>>>
|
template <typename = std::enable_if<std::is_same_v<EmptyDeleter, Deleter>>>
|
||||||
BucketCacheIndex(size_t cells_)
|
BucketCacheIndex(size_t cells_)
|
||||||
: buckets(nearestPowTwo(cells_) / bucket_size)
|
: buckets(roundUpToPowerOfTwoOrZero(cells_) / bucket_size)
|
||||||
, bucket_mask(buckets - 1)
|
, bucket_mask(buckets - 1)
|
||||||
, cells(buckets * bucket_size)
|
, cells(buckets * bucket_size)
|
||||||
, positions((buckets / 2) + 1)
|
, positions((buckets / 2) + 1)
|
||||||
@ -55,7 +62,7 @@ public:
|
|||||||
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
|
template <typename = std::enable_if<!std::is_same_v<EmptyDeleter, Deleter>>>
|
||||||
BucketCacheIndex(size_t cells_, Deleter deleter_)
|
BucketCacheIndex(size_t cells_, Deleter deleter_)
|
||||||
: deleter(deleter_)
|
: deleter(deleter_)
|
||||||
, buckets(nearestPowTwo(cells_) / bucket_size)
|
, buckets(roundUpToPowerOfTwoOrZero(cells_) / bucket_size)
|
||||||
, bucket_mask(buckets - 1)
|
, bucket_mask(buckets - 1)
|
||||||
, cells(buckets * bucket_size)
|
, cells(buckets * bucket_size)
|
||||||
, positions((buckets / 2) + 1)
|
, positions((buckets / 2) + 1)
|
||||||
@ -159,6 +166,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
/// Searches for the key in the bucket.
|
||||||
|
/// Returns index of cell with provided key.
|
||||||
size_t getCellIndex(const K key, const size_t bucket) const
|
size_t getCellIndex(const K key, const size_t bucket) const
|
||||||
{
|
{
|
||||||
const size_t pos = getPosition(bucket);
|
const size_t pos = getPosition(bucket);
|
||||||
@ -175,6 +184,7 @@ private:
|
|||||||
return bucket * bucket_size + pos;
|
return bucket * bucket_size + pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns current position for write in the bucket.
|
||||||
size_t getPosition(const size_t bucket) const
|
size_t getPosition(const size_t bucket) const
|
||||||
{
|
{
|
||||||
const size_t idx = (bucket >> 1);
|
const size_t idx = (bucket >> 1);
|
||||||
@ -183,6 +193,7 @@ private:
|
|||||||
return (positions[idx] & pos_mask);
|
return (positions[idx] & pos_mask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets current posiotion in the bucket.
|
||||||
void setPosition(const size_t bucket, const size_t pos)
|
void setPosition(const size_t bucket, const size_t pos)
|
||||||
{
|
{
|
||||||
const size_t idx = bucket >> 1;
|
const size_t idx = bucket >> 1;
|
||||||
|
@ -55,6 +55,7 @@ namespace ErrorCodes
|
|||||||
extern const int AIO_WRITE_ERROR;
|
extern const int AIO_WRITE_ERROR;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
extern const int CANNOT_ALLOCATE_MEMORY;
|
||||||
|
extern const int CANNOT_CREATE_DIRECTORY;
|
||||||
extern const int CANNOT_FSYNC;
|
extern const int CANNOT_FSYNC;
|
||||||
extern const int CANNOT_IO_GETEVENTS;
|
extern const int CANNOT_IO_GETEVENTS;
|
||||||
extern const int CANNOT_IO_SUBMIT;
|
extern const int CANNOT_IO_SUBMIT;
|
||||||
@ -68,17 +69,17 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
constexpr size_t DEFAULT_SSD_BLOCK_SIZE = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
constexpr size_t DEFAULT_SSD_BLOCK_SIZE_BYTES = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
constexpr size_t DEFAULT_FILE_SIZE = 4 * 1024 * 1024 * 1024ULL;
|
constexpr size_t DEFAULT_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024ULL;
|
||||||
constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
|
constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
|
||||||
constexpr size_t DEFAULT_READ_BUFFER_SIZE = 16 * DEFAULT_SSD_BLOCK_SIZE;
|
constexpr size_t DEFAULT_READ_BUFFER_SIZE_BYTES = 16 * DEFAULT_SSD_BLOCK_SIZE_BYTES;
|
||||||
constexpr size_t DEFAULT_WRITE_BUFFER_SIZE = DEFAULT_SSD_BLOCK_SIZE;
|
constexpr size_t DEFAULT_WRITE_BUFFER_SIZE_BYTES = DEFAULT_SSD_BLOCK_SIZE_BYTES;
|
||||||
|
|
||||||
constexpr size_t DEFAULT_MAX_STORED_KEYS = 100000;
|
constexpr size_t DEFAULT_MAX_STORED_KEYS = 100000;
|
||||||
|
|
||||||
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
constexpr size_t BLOCK_CHECKSUM_SIZE = 8;
|
constexpr size_t BLOCK_CHECKSUM_SIZE_BYTES = 8;
|
||||||
constexpr size_t BLOCK_SPECIAL_FIELDS_SIZE = 4;
|
constexpr size_t BLOCK_SPECIAL_FIELDS_SIZE_BYTES = 4;
|
||||||
|
|
||||||
constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
|
constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
|
||||||
constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
|
constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
|
||||||
@ -189,7 +190,8 @@ SSDCachePartition::SSDCachePartition(
|
|||||||
keys_buffer.type = AttributeUnderlyingType::utUInt64;
|
keys_buffer.type = AttributeUnderlyingType::utUInt64;
|
||||||
keys_buffer.values = SSDCachePartition::Attribute::Container<UInt64>();
|
keys_buffer.values = SSDCachePartition::Attribute::Container<UInt64>();
|
||||||
|
|
||||||
std::filesystem::create_directories(std::filesystem::path{dir_path});
|
if (!std::filesystem::create_directories(std::filesystem::path{dir_path}))
|
||||||
|
throw Exception{"Failed to create directories.", ErrorCodes::CANNOT_CREATE_DIRECTORY};
|
||||||
|
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||||
@ -238,22 +240,20 @@ size_t SSDCachePartition::appendBlock(
|
|||||||
{
|
{
|
||||||
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
|
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
|
||||||
uint64_t tmp = 0;
|
uint64_t tmp = 0;
|
||||||
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_CHECKSUM_SIZE);
|
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_CHECKSUM_SIZE_BYTES);
|
||||||
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_SPECIAL_FIELDS_SIZE);
|
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_SPECIAL_FIELDS_SIZE_BYTES);
|
||||||
keys_in_block = 0;
|
keys_in_block = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!write_buffer)
|
if (!write_buffer)
|
||||||
{
|
|
||||||
init_write_buffer();
|
init_write_buffer();
|
||||||
}
|
|
||||||
|
|
||||||
bool flushed = false;
|
bool flushed = false;
|
||||||
auto finish_block = [&]()
|
auto finish_block = [&]()
|
||||||
{
|
{
|
||||||
write_buffer.reset();
|
write_buffer.reset();
|
||||||
std::memcpy(memory->data() + block_size * current_memory_block_id + BLOCK_CHECKSUM_SIZE, &keys_in_block, sizeof(keys_in_block)); // set count
|
std::memcpy(memory->data() + block_size * current_memory_block_id + BLOCK_CHECKSUM_SIZE_BYTES, &keys_in_block, sizeof(keys_in_block)); // set count
|
||||||
uint64_t checksum = CityHash_v1_0_2::CityHash64(memory->data() + block_size * current_memory_block_id + BLOCK_CHECKSUM_SIZE, block_size - BLOCK_CHECKSUM_SIZE); // checksum
|
uint64_t checksum = CityHash_v1_0_2::CityHash64(memory->data() + block_size * current_memory_block_id + BLOCK_CHECKSUM_SIZE_BYTES, block_size - BLOCK_CHECKSUM_SIZE_BYTES); // checksum
|
||||||
std::memcpy(memory->data() + block_size * current_memory_block_id, &checksum, sizeof(checksum));
|
std::memcpy(memory->data() + block_size * current_memory_block_id, &checksum, sizeof(checksum));
|
||||||
if (++current_memory_block_id == write_buffer_size)
|
if (++current_memory_block_id == write_buffer_size)
|
||||||
flush();
|
flush();
|
||||||
@ -662,7 +662,7 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
|
|||||||
uint64_t checksum = 0;
|
uint64_t checksum = 0;
|
||||||
ReadBufferFromMemory buf_special(buf_ptr, block_size);
|
ReadBufferFromMemory buf_special(buf_ptr, block_size);
|
||||||
readBinary(checksum, buf_special);
|
readBinary(checksum, buf_special);
|
||||||
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(buf_ptr + BLOCK_CHECKSUM_SIZE, block_size - BLOCK_CHECKSUM_SIZE);
|
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(buf_ptr + BLOCK_CHECKSUM_SIZE_BYTES, block_size - BLOCK_CHECKSUM_SIZE_BYTES);
|
||||||
if (checksum != calculated_checksum)
|
if (checksum != calculated_checksum)
|
||||||
{
|
{
|
||||||
throw Exception("Cache data corrupted. From block = " + std::to_string(checksum) + " calculated = " + std::to_string(calculated_checksum) + ".", ErrorCodes::CORRUPTED_DATA);
|
throw Exception("Cache data corrupted. From block = " + std::to_string(checksum) + " calculated = " + std::to_string(calculated_checksum) + ".", ErrorCodes::CORRUPTED_DATA);
|
||||||
@ -756,7 +756,7 @@ void SSDCachePartition::clearOldestBlocks()
|
|||||||
|
|
||||||
uint64_t checksum = 0;
|
uint64_t checksum = 0;
|
||||||
readBinary(checksum, read_buffer);
|
readBinary(checksum, read_buffer);
|
||||||
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(read_buffer_memory.data() + i * block_size + BLOCK_CHECKSUM_SIZE, block_size - BLOCK_CHECKSUM_SIZE);
|
uint64_t calculated_checksum = CityHash_v1_0_2::CityHash64(read_buffer_memory.data() + i * block_size + BLOCK_CHECKSUM_SIZE_BYTES, block_size - BLOCK_CHECKSUM_SIZE_BYTES);
|
||||||
if (checksum != calculated_checksum)
|
if (checksum != calculated_checksum)
|
||||||
{
|
{
|
||||||
throw Exception("Cache data corrupted. From block = " + std::to_string(checksum) + " calculated = " + std::to_string(calculated_checksum) + ".", ErrorCodes::CORRUPTED_DATA);
|
throw Exception("Cache data corrupted. From block = " + std::to_string(checksum) + " calculated = " + std::to_string(calculated_checksum) + ".", ErrorCodes::CORRUPTED_DATA);
|
||||||
@ -814,7 +814,7 @@ void SSDCachePartition::clearOldestBlocks()
|
|||||||
|
|
||||||
const size_t start_block = current_file_block_id % max_size;
|
const size_t start_block = current_file_block_id % max_size;
|
||||||
const size_t finish_block = start_block + write_buffer_size;
|
const size_t finish_block = start_block + write_buffer_size;
|
||||||
for (const auto& key : keys)
|
for (const auto & key : keys)
|
||||||
{
|
{
|
||||||
Index index;
|
Index index;
|
||||||
if (key_to_index.get(key, index))
|
if (key_to_index.get(key, index))
|
||||||
@ -1262,9 +1262,7 @@ void SSDCacheStorage::collectGarbage()
|
|||||||
{
|
{
|
||||||
// add partitions to queue
|
// add partitions to queue
|
||||||
while (partitions.size() > max_partitions_count)
|
while (partitions.size() > max_partitions_count)
|
||||||
{
|
|
||||||
partition_delete_queue.splice(std::end(partition_delete_queue), partitions, std::prev(std::end(partitions)));
|
partition_delete_queue.splice(std::end(partition_delete_queue), partitions, std::prev(std::end(partitions)));
|
||||||
}
|
|
||||||
|
|
||||||
// drop unused partitions
|
// drop unused partitions
|
||||||
while (!partition_delete_queue.empty() && partition_delete_queue.front().use_count() == 1)
|
while (!partition_delete_queue.empty() && partition_delete_queue.front().use_count() == 1)
|
||||||
@ -1650,23 +1648,23 @@ void registerDictionarySSDCache(DictionaryFactory & factory)
|
|||||||
if (max_partitions_count <= 0)
|
if (max_partitions_count <= 0)
|
||||||
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) max_partitions_count", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) max_partitions_count", ErrorCodes::BAD_ARGUMENTS};
|
||||||
|
|
||||||
const auto block_size = config.getInt(layout_prefix + ".ssd_cache.block_size", DEFAULT_SSD_BLOCK_SIZE);
|
const auto block_size = config.getInt(layout_prefix + ".ssd_cache.block_size", DEFAULT_SSD_BLOCK_SIZE_BYTES);
|
||||||
if (block_size <= 0)
|
if (block_size <= 0)
|
||||||
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) block_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) block_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
|
|
||||||
const auto file_size = config.getInt64(layout_prefix + ".ssd_cache.file_size", DEFAULT_FILE_SIZE);
|
const auto file_size = config.getInt64(layout_prefix + ".ssd_cache.file_size", DEFAULT_FILE_SIZE_BYTES);
|
||||||
if (file_size <= 0)
|
if (file_size <= 0)
|
||||||
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) file_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) file_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
if (file_size % block_size != 0)
|
if (file_size % block_size != 0)
|
||||||
throw Exception{name + ": file_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": file_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
|
|
||||||
const auto read_buffer_size = config.getInt64(layout_prefix + ".ssd_cache.read_buffer_size", DEFAULT_READ_BUFFER_SIZE);
|
const auto read_buffer_size = config.getInt64(layout_prefix + ".ssd_cache.read_buffer_size", DEFAULT_READ_BUFFER_SIZE_BYTES);
|
||||||
if (read_buffer_size <= 0)
|
if (read_buffer_size <= 0)
|
||||||
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) read_buffer_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) read_buffer_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
if (read_buffer_size % block_size != 0)
|
if (read_buffer_size % block_size != 0)
|
||||||
throw Exception{name + ": read_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": read_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
|
|
||||||
const auto write_buffer_size = config.getInt64(layout_prefix + ".ssd_cache.write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE);
|
const auto write_buffer_size = config.getInt64(layout_prefix + ".ssd_cache.write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE_BYTES);
|
||||||
if (write_buffer_size <= 0)
|
if (write_buffer_size <= 0)
|
||||||
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) write_buffer_size", ErrorCodes::BAD_ARGUMENTS};
|
throw Exception{name + ": dictionary of layout 'ssd_cache' cannot have 0 (or less) write_buffer_size", ErrorCodes::BAD_ARGUMENTS};
|
||||||
if (write_buffer_size % block_size != 0)
|
if (write_buffer_size % block_size != 0)
|
||||||
|
@ -43,6 +43,11 @@ using AttributeValueVariant = std::variant<
|
|||||||
Float64,
|
Float64,
|
||||||
String>;
|
String>;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Class for operations with cache file and index.
|
||||||
|
Supports GET/SET operations.
|
||||||
|
*/
|
||||||
class SSDCachePartition
|
class SSDCachePartition
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -144,8 +149,6 @@ public:
|
|||||||
|
|
||||||
size_t appendDefaults(const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin);
|
size_t appendDefaults(const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin);
|
||||||
|
|
||||||
void clearOldestBlocks();
|
|
||||||
|
|
||||||
void flush();
|
void flush();
|
||||||
|
|
||||||
void remove();
|
void remove();
|
||||||
@ -161,6 +164,8 @@ public:
|
|||||||
size_t getBytesAllocated() const;
|
size_t getBytesAllocated() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void clearOldestBlocks();
|
||||||
|
|
||||||
template <typename SetFunc>
|
template <typename SetFunc>
|
||||||
void getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, std::vector<bool> & found) const;
|
void getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, std::vector<bool> & found) const;
|
||||||
|
|
||||||
@ -192,7 +197,6 @@ private:
|
|||||||
std::optional<Memory<>> memory;
|
std::optional<Memory<>> memory;
|
||||||
std::optional<WriteBuffer> write_buffer;
|
std::optional<WriteBuffer> write_buffer;
|
||||||
uint32_t keys_in_block = 0;
|
uint32_t keys_in_block = 0;
|
||||||
//CompressionCodecPtr codec;
|
|
||||||
|
|
||||||
size_t current_memory_block_id = 0;
|
size_t current_memory_block_id = 0;
|
||||||
size_t current_file_block_id = 0;
|
size_t current_file_block_id = 0;
|
||||||
@ -201,6 +205,9 @@ private:
|
|||||||
using SSDCachePartitionPtr = std::shared_ptr<SSDCachePartition>;
|
using SSDCachePartitionPtr = std::shared_ptr<SSDCachePartition>;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Class for managing SSDCachePartition and getting data from source.
|
||||||
|
*/
|
||||||
class SSDCacheStorage
|
class SSDCacheStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -285,6 +292,9 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Dictionary interface
|
||||||
|
*/
|
||||||
class SSDCacheDictionary final : public IDictionary
|
class SSDCacheDictionary final : public IDictionary
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@ -55,6 +55,7 @@ namespace ErrorCodes
|
|||||||
extern const int AIO_WRITE_ERROR;
|
extern const int AIO_WRITE_ERROR;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int CANNOT_ALLOCATE_MEMORY;
|
extern const int CANNOT_ALLOCATE_MEMORY;
|
||||||
|
extern const int CANNOT_CREATE_DIRECTORY;
|
||||||
extern const int CANNOT_FSYNC;
|
extern const int CANNOT_FSYNC;
|
||||||
extern const int CANNOT_IO_GETEVENTS;
|
extern const int CANNOT_IO_GETEVENTS;
|
||||||
extern const int CANNOT_IO_SUBMIT;
|
extern const int CANNOT_IO_SUBMIT;
|
||||||
@ -185,7 +186,8 @@ SSDComplexKeyCachePartition::SSDComplexKeyCachePartition(
|
|||||||
, key_to_index(max_stored_keys, KeyDeleter(keys_pool))
|
, key_to_index(max_stored_keys, KeyDeleter(keys_pool))
|
||||||
, attributes_structure(attributes_structure_)
|
, attributes_structure(attributes_structure_)
|
||||||
{
|
{
|
||||||
std::filesystem::create_directories(std::filesystem::path{dir_path});
|
if (!std::filesystem::create_directories(std::filesystem::path{dir_path}))
|
||||||
|
throw Exception{"Failed to create directories.", ErrorCodes::CANNOT_CREATE_DIRECTORY};
|
||||||
|
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||||
@ -199,10 +201,8 @@ SSDComplexKeyCachePartition::SSDComplexKeyCachePartition(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (preallocateDiskSpace(fd, max_size * block_size) < 0)
|
if (preallocateDiskSpace(fd, max_size * block_size) < 0)
|
||||||
{
|
|
||||||
throwFromErrnoWithPath("Cannot preallocate space for the file " + filename, filename, ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
throwFromErrnoWithPath("Cannot preallocate space for the file " + filename, filename, ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDComplexKeyCachePartition::~SSDComplexKeyCachePartition()
|
SSDComplexKeyCachePartition::~SSDComplexKeyCachePartition()
|
||||||
@ -219,9 +219,8 @@ size_t SSDComplexKeyCachePartition::appendDefaults(
|
|||||||
std::unique_lock lock(rw_lock);
|
std::unique_lock lock(rw_lock);
|
||||||
KeyRefs keys(keys_in.size());
|
KeyRefs keys(keys_in.size());
|
||||||
for (size_t i = 0; i < keys_in.size(); ++i)
|
for (size_t i = 0; i < keys_in.size(); ++i)
|
||||||
{
|
|
||||||
keys[i] = keys_pool.copyKeyFrom(keys_in[i]);
|
keys[i] = keys_pool.copyKeyFrom(keys_in[i]);
|
||||||
}
|
|
||||||
return append(keys, Attributes{}, metadata, begin);
|
return append(keys, Attributes{}, metadata, begin);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,10 +237,8 @@ size_t SSDComplexKeyCachePartition::appendBlock(
|
|||||||
{
|
{
|
||||||
StringRefs tmp_keys_refs(keys_size);
|
StringRefs tmp_keys_refs(keys_size);
|
||||||
for (size_t i = 0; i < key_columns.front()->size(); ++i)
|
for (size_t i = 0; i < key_columns.front()->size(); ++i)
|
||||||
{
|
|
||||||
keys[i] = keys_pool.allocKey(i, key_columns, tmp_keys_refs);
|
keys[i] = keys_pool.allocKey(i, key_columns, tmp_keys_refs);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return append(keys, new_attributes, metadata, begin);
|
return append(keys, new_attributes, metadata, begin);
|
||||||
}
|
}
|
||||||
@ -265,13 +262,9 @@ size_t SSDComplexKeyCachePartition::append(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if (!write_buffer)
|
if (!write_buffer)
|
||||||
{
|
|
||||||
init_write_buffer();
|
init_write_buffer();
|
||||||
}
|
|
||||||
if (!keys_buffer_pool)
|
if (!keys_buffer_pool)
|
||||||
{
|
|
||||||
keys_buffer_pool.emplace();
|
keys_buffer_pool.emplace();
|
||||||
}
|
|
||||||
|
|
||||||
bool flushed = false;
|
bool flushed = false;
|
||||||
auto finish_block = [&]()
|
auto finish_block = [&]()
|
||||||
@ -381,8 +374,6 @@ void SSDComplexKeyCachePartition::flush()
|
|||||||
if (keys_buffer.empty())
|
if (keys_buffer.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
//Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!! " + std::to_string(file_id) + " block: " + std::to_string(current_file_block_id));
|
|
||||||
|
|
||||||
AIOContext aio_context{1};
|
AIOContext aio_context{1};
|
||||||
|
|
||||||
iocb write_request{};
|
iocb write_request{};
|
||||||
@ -760,16 +751,12 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
|
|||||||
AIOContext aio_context(1);
|
AIOContext aio_context(1);
|
||||||
|
|
||||||
while (io_submit(aio_context.ctx, 1, &request_ptr) != 1)
|
while (io_submit(aio_context.ctx, 1, &request_ptr) != 1)
|
||||||
{
|
|
||||||
if (errno != EINTR)
|
if (errno != EINTR)
|
||||||
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
|
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
|
||||||
}
|
|
||||||
|
|
||||||
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) != 1)
|
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) != 1)
|
||||||
{
|
|
||||||
if (errno != EINTR)
|
if (errno != EINTR)
|
||||||
throwFromErrno("io_getevents: Failed to get an event for asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
|
throwFromErrno("io_getevents: Failed to get an event for asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
|
||||||
}
|
|
||||||
|
|
||||||
#if defined(__FreeBSD__)
|
#if defined(__FreeBSD__)
|
||||||
if (aio_return(reinterpret_cast<struct aiocb *>(event.udata)) != static_cast<ssize_t>(request.aio.aio_nbytes))
|
if (aio_return(reinterpret_cast<struct aiocb *>(event.udata)) != static_cast<ssize_t>(request.aio.aio_nbytes))
|
||||||
|
@ -130,6 +130,9 @@ using AttributeValueVariant = std::variant<
|
|||||||
Float64,
|
Float64,
|
||||||
String>;
|
String>;
|
||||||
|
|
||||||
|
/*
|
||||||
|
The pool for storing complex keys.
|
||||||
|
*/
|
||||||
template <typename A>
|
template <typename A>
|
||||||
class ComplexKeysPoolImpl
|
class ComplexKeysPoolImpl
|
||||||
{
|
{
|
||||||
@ -243,6 +246,11 @@ struct KeyDeleter
|
|||||||
ComplexKeysPool & keys_pool;
|
ComplexKeysPool & keys_pool;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Class for operations with cache file and index.
|
||||||
|
Supports GET/SET operations.
|
||||||
|
*/
|
||||||
class SSDComplexKeyCachePartition
|
class SSDComplexKeyCachePartition
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -418,6 +426,9 @@ private:
|
|||||||
using SSDComplexKeyCachePartitionPtr = std::shared_ptr<SSDComplexKeyCachePartition>;
|
using SSDComplexKeyCachePartitionPtr = std::shared_ptr<SSDComplexKeyCachePartition>;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Class for managing SSDCachePartition and getting data from source.
|
||||||
|
*/
|
||||||
class SSDComplexKeyCacheStorage
|
class SSDComplexKeyCacheStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -503,6 +514,9 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Dictionary interface
|
||||||
|
*/
|
||||||
class SSDComplexKeyCacheDictionary final : public IDictionaryBase
|
class SSDComplexKeyCacheDictionary final : public IDictionaryBase
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
Loading…
Reference in New Issue
Block a user