mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 08:52:06 +00:00
update
This commit is contained in:
parent
443a5ca9c1
commit
3bbb73e37c
@ -1,14 +1,34 @@
|
||||
#include "SSDCacheDictionary.h"
|
||||
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Common/ProfilingScopedRWLock.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/ProfilingScopedRWLock.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <ext/chrono_io.h>
|
||||
#include <ext/map.h>
|
||||
#include <ext/range.h>
|
||||
#include <ext/size.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event DictCacheKeysRequested;
|
||||
extern const Event DictCacheKeysRequestedMiss;
|
||||
extern const Event DictCacheKeysRequestedFound;
|
||||
extern const Event DictCacheKeysExpired;
|
||||
extern const Event DictCacheKeysNotFound;
|
||||
extern const Event DictCacheKeysHit;
|
||||
extern const Event DictCacheRequestTimeNs;
|
||||
extern const Event DictCacheRequests;
|
||||
extern const Event DictCacheLockWriteNs;
|
||||
extern const Event DictCacheLockReadNs;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric DictCacheRequests;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -21,8 +41,8 @@ namespace ErrorCodes
|
||||
extern const int TOO_SMALL_BUFFER_SIZE;
|
||||
}
|
||||
|
||||
CachePartition::CachePartition(const std::string & file_name, const Block & header, size_t buffer_size)
|
||||
: file_name(file_name), buffer_size(buffer_size), out_file(file_name, buffer_size), header(header), buffer(header.cloneEmptyColumns())
|
||||
CachePartition::CachePartition(CacheStorage & storage_, const size_t file_id_, const size_t max_size_, const size_t buffer_size_)
|
||||
: storage(storage_), file_id(file_id_), max_size(max_size_), buffer_size(buffer_size_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -91,6 +111,124 @@ void CachePartition::flush()
|
||||
buffer = header.cloneEmptyColumns();
|
||||
}
|
||||
|
||||
template <typename PresentIdHandler, typename AbsentIdHandler>
|
||||
std::exception_ptr CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
|
||||
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found)
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
|
||||
|
||||
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
|
||||
for (const auto id : requested_ids)
|
||||
remaining_ids.insert({id, 0});
|
||||
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
|
||||
if (now > backoff_end_time)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (update_error_count)
|
||||
{
|
||||
/// Recover after error: we have to clone the source here because
|
||||
/// it could keep connections which should be reset after error.
|
||||
source_ptr = source_ptr->clone();
|
||||
}
|
||||
|
||||
Stopwatch watch;
|
||||
auto stream = source_ptr->loadIds(requested_ids);
|
||||
stream->readPrefix();
|
||||
|
||||
while (const auto block = stream->read())
|
||||
{
|
||||
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
|
||||
if (!id_column)
|
||||
throw Exception{"Id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
|
||||
|
||||
const auto & ids = id_column->getData();
|
||||
|
||||
/// cache column pointers
|
||||
const auto column_ptrs = ext::map<std::vector>(
|
||||
ext::range(0, dictionary.getAttributes().size()),
|
||||
[&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
|
||||
|
||||
for (const auto i : ext::range(0, ids.size()))
|
||||
{
|
||||
const auto id = ids[i];
|
||||
|
||||
on_updated(id, i, column_ptrs);
|
||||
/// mark corresponding id as found
|
||||
remaining_ids[id] = 1;
|
||||
}
|
||||
|
||||
/// TODO: Add TTL to block
|
||||
partitions[0]->appendBlock(block);
|
||||
}
|
||||
|
||||
stream->readSuffix();
|
||||
|
||||
update_error_count = 0;
|
||||
last_update_exception = std::exception_ptr{};
|
||||
backoff_end_time = std::chrono::system_clock::time_point{};
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
++update_error_count;
|
||||
last_update_exception = std::current_exception();
|
||||
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, update_error_count));
|
||||
|
||||
tryLogException(last_update_exception, log, "Could not update cache dictionary '" + dictionary.getName() +
|
||||
"', next update is scheduled at " + ext::to_string(backoff_end_time));
|
||||
}
|
||||
}
|
||||
|
||||
size_t not_found_num = 0, found_num = 0;
|
||||
|
||||
/// Check which ids have not been found and require setting null_value
|
||||
auto mutable_columns = header.cloneEmptyColumns();
|
||||
for (const auto & id_found_pair : remaining_ids)
|
||||
{
|
||||
if (id_found_pair.second)
|
||||
{
|
||||
++found_num;
|
||||
continue;
|
||||
}
|
||||
++not_found_num;
|
||||
|
||||
const auto id = id_found_pair.first;
|
||||
|
||||
if (update_error_count)
|
||||
{
|
||||
/// TODO: юзать старые значения.
|
||||
|
||||
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
|
||||
std::rethrow_exception(last_update_exception);
|
||||
}
|
||||
|
||||
/// TODO: Add TTL
|
||||
|
||||
/// Set null_value for each attribute
|
||||
const auto & attributes = dictionary.getAttributes();
|
||||
for (size_t i = 0; i < attributes.size(); ++i)
|
||||
{
|
||||
const auto & attribute = attributes[i];
|
||||
mutable_columns[i].insert(attribute.null_value);
|
||||
}
|
||||
|
||||
/// inform caller that the cell has not been found
|
||||
on_id_not_found(id);
|
||||
}
|
||||
partitions[0]->appendBlock(header.cloneWithColumns(std::move(mutable_columns)));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
|
||||
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
|
||||
}
|
||||
|
||||
SSDCacheDictionary::SSDCacheDictionary(
|
||||
const std::string & name_,
|
||||
const DictionaryStructure & dict_struct_,
|
||||
@ -102,7 +240,7 @@ SSDCacheDictionary::SSDCacheDictionary(
|
||||
, dict_struct(dict_struct_)
|
||||
, source_ptr(std::move(source_ptr_))
|
||||
, dict_lifetime(dict_lifetime_)
|
||||
, storage(path, partition_max_size)
|
||||
, storage(*this, path, 1, partition_max_size)
|
||||
{
|
||||
if (!this->source_ptr->supportsSelectiveLoad())
|
||||
throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
|
||||
@ -209,6 +347,8 @@ template <typename AttributeType, typename OutputType, typename DefaultGetter>
|
||||
void SSDCacheDictionary::getItemsNumberImpl(
|
||||
const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
|
||||
{
|
||||
const auto attribute_index = getAttributeIndex(attribute_index);
|
||||
|
||||
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
|
||||
storage.getValue(attribute_name, ids, out, not_found_ids);
|
||||
if (not_found_ids.empty())
|
||||
@ -217,12 +357,13 @@ void SSDCacheDictionary::getItemsNumberImpl(
|
||||
std::vector<Key> required_ids(not_found_ids.size());
|
||||
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
|
||||
|
||||
update(
|
||||
storage.update(
|
||||
source_ptr,
|
||||
required_ids,
|
||||
[&](const auto id, const auto & attribute_value)
|
||||
[&](const auto id, const auto row, const auto & attributes)
|
||||
{
|
||||
for (const size_t row : not_found_ids[id])
|
||||
out[row] = static_cast<OutputType>(attribute_value);
|
||||
out[row] = static_cast<OutputType>(attributes[attribute_index][row]);
|
||||
},
|
||||
[&](const auto id)
|
||||
{
|
||||
@ -287,6 +428,11 @@ const SSDCacheDictionary::Attribute & SSDCacheDictionary::getAttribute(const std
|
||||
return attributes[getAttributeIndex(attr_name)];
|
||||
}
|
||||
|
||||
const SSDCacheDictionary::Attributes & SSDCacheDictionary::getAttributes() const
|
||||
{
|
||||
return attributes;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
SSDCacheDictionary::Attribute SSDCacheDictionary::createAttributeWithTypeImpl(const AttributeUnderlyingType type, const Field & null_value)
|
||||
{
|
||||
|
@ -22,12 +22,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
constexpr size_t OFFSET_MASK = ~0xffff000000000000;
|
||||
constexpr size_t FILE_ID_SIZE = 16;
|
||||
constexpr size_t FILE_OFFSET_SIZE = sizeof(size_t) * 8 - FILE_ID_SIZE;
|
||||
|
||||
|
||||
class SSDCacheDictionary;
|
||||
class CacheStorage;
|
||||
|
||||
class CachePartition
|
||||
{
|
||||
@ -35,7 +31,7 @@ public:
|
||||
using Offset = size_t;
|
||||
using Offsets = std::vector<Offset>;
|
||||
|
||||
CachePartition(const std::string & file_name, const Block & header = {}, size_t buffer_size = 4 * 1024 * 1024);
|
||||
CachePartition(CacheStorage & storage, const size_t file_id, const size_t max_size, const size_t buffer_size = 4 * 1024 * 1024);
|
||||
|
||||
void appendBlock(const Block & block);
|
||||
|
||||
@ -48,34 +44,46 @@ public:
|
||||
|
||||
// TODO:: getString
|
||||
|
||||
/// 0 -- not found
|
||||
/// 1 -- good
|
||||
/// 2 -- expired
|
||||
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out) const;
|
||||
|
||||
private:
|
||||
void flush();
|
||||
|
||||
std::string file_name;
|
||||
|
||||
CacheStorage & storage;
|
||||
|
||||
size_t file_id;
|
||||
size_t max_size;
|
||||
size_t buffer_size;
|
||||
|
||||
WriteBufferFromFile out_file; // 4MB
|
||||
|
||||
/// Block structure: Key, (Default + TTL), Attr1, Attr2, ...
|
||||
Block header;
|
||||
//mutable std::shared_mutex rw_lock;
|
||||
int index_fd;
|
||||
int data_fd;
|
||||
|
||||
std::unordered_map<UInt64, size_t> key_to_file_offset;
|
||||
MutableColumns buffer;
|
||||
|
||||
mutable std::atomic<size_t> element_count{0};
|
||||
};
|
||||
|
||||
using CachePartitionPtr = std::unique_ptr<CachePartition>;
|
||||
|
||||
|
||||
class CacheStorage
|
||||
{
|
||||
CacheStorage(const std::string & path_, size_t partition_max_size_)
|
||||
: path(path_)
|
||||
, partition_max_size(partition_max_size_)
|
||||
{
|
||||
partition = std::make_unique<CachePartition>(path);
|
||||
}
|
||||
using Key = IDictionary::Key;
|
||||
|
||||
void appendBlock(const Block& block)
|
||||
CacheStorage(SSDCacheDictionary & dictionary_, const std::string & path_, const size_t partitions_count_, const size_t partition_max_size_)
|
||||
: dictionary(dictionary_)
|
||||
, path(path_)
|
||||
, partition_max_size(partition_max_size_)
|
||||
, log(&Poco::Logger::get("CacheStorage"))
|
||||
{
|
||||
partition->appendBlock(block);
|
||||
for (size_t partition_id = 0; partition_id < partitions_count_; ++partition_id)
|
||||
partitions.emplace_back(std::make_unique<CachePartition>(partition_id, partition_max_size));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -85,17 +93,42 @@ class CacheStorage
|
||||
void getValue(const std::string & attribute_name, const PaddedPODArray<UInt64> & ids,
|
||||
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found) const
|
||||
{
|
||||
partition->getValue(attribute_name, ids, out, not_found);
|
||||
partitions[0]->getValue(attribute_name, ids, out, not_found);
|
||||
}
|
||||
|
||||
// getString();
|
||||
|
||||
template <typename PresentIdHandler, typename AbsentIdHandler>
|
||||
std::exception_ptr update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
|
||||
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found);
|
||||
|
||||
//BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const;
|
||||
|
||||
std::exception_ptr getLastException() const { return last_update_exception; }
|
||||
|
||||
private:
|
||||
SSDCacheDictionary & dictionary;
|
||||
|
||||
/// Block structure: Key, (Default + TTL), Attr1, Attr2, ...
|
||||
const Block header;
|
||||
const std::string path;
|
||||
const size_t partition_max_size;
|
||||
std::unique_ptr<CachePartition> partition;
|
||||
std::vector<CachePartitionPtr> partitions;
|
||||
|
||||
Logger * const log;
|
||||
|
||||
mutable pcg64 rnd_engine;
|
||||
|
||||
mutable std::shared_mutex rw_lock;
|
||||
|
||||
mutable std::exception_ptr last_update_exception;
|
||||
mutable size_t update_error_count = 0;
|
||||
mutable std::chrono::system_clock::time_point backoff_end_time;
|
||||
|
||||
// stats
|
||||
mutable std::atomic<size_t> element_count{0};
|
||||
mutable std::atomic<size_t> hit_count{0};
|
||||
mutable std::atomic<size_t> query_count{0};
|
||||
};
|
||||
|
||||
|
||||
@ -125,13 +158,13 @@ public:
|
||||
|
||||
size_t getElementCount() const override { return element_count.load(std::memory_order_relaxed); }
|
||||
|
||||
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / max_size; } // TODO: fix
|
||||
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / partition_max_size; } // TODO: fix
|
||||
|
||||
bool supportUpdates() const override { return true; }
|
||||
|
||||
std::shared_ptr<const IExternalLoadable> clone() const override
|
||||
{
|
||||
return std::make_shared<SSDCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, max_size);
|
||||
return std::make_shared<SSDCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, partition_max_size);
|
||||
}
|
||||
|
||||
const IDictionarySource * getSource() const override { return source_ptr.get(); }
|
||||
@ -149,7 +182,7 @@ public:
|
||||
|
||||
void toParent(const PaddedPODArray<Key> & /* ids */, PaddedPODArray<Key> & /* out */ ) const override {}
|
||||
|
||||
std::exception_ptr getLastException() const override { return last_exception; }
|
||||
std::exception_ptr getLastException() const override { return storage.getLastException(); }
|
||||
|
||||
template <typename T>
|
||||
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
|
||||
@ -222,9 +255,16 @@ public:
|
||||
|
||||
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
|
||||
|
||||
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
|
||||
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override // TODO
|
||||
{
|
||||
UNUSED(column_names);
|
||||
UNUSED(max_block_size);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class CacheStorage;
|
||||
|
||||
struct Attribute
|
||||
{
|
||||
AttributeUnderlyingType type;
|
||||
@ -250,6 +290,7 @@ private:
|
||||
size_t getAttributeIndex(const std::string & attr_name) const;
|
||||
Attribute & getAttribute(const std::string & attr_name);
|
||||
const Attribute & getAttribute(const std::string & attr_name) const;
|
||||
const Attributes & getAttributes() const;
|
||||
|
||||
template <typename T>
|
||||
Attribute createAttributeWithTypeImpl(const AttributeUnderlyingType type, const Field & null_value);
|
||||
@ -262,28 +303,20 @@ private:
|
||||
template <typename DefaultGetter>
|
||||
void getItemsString(const std::string & attribute_name, const PaddedPODArray<Key> & ids,
|
||||
ColumnString * out, DefaultGetter && get_default) const;
|
||||
|
||||
template <typename PresentIdHandler, typename AbsentIdHandler>
|
||||
void update(const std::vector<Key> & requested_ids, PresentIdHandler && on_updated,
|
||||
AbsentIdHandler && on_id_not_found) const;
|
||||
|
||||
const std::string name;
|
||||
const DictionaryStructure dict_struct;
|
||||
mutable DictionarySourcePtr source_ptr;
|
||||
const DictionaryLifetime dict_lifetime;
|
||||
|
||||
CacheStorage storage;
|
||||
const std::string path;
|
||||
const size_t partition_max_size;
|
||||
mutable CacheStorage storage;
|
||||
Logger * const log;
|
||||
|
||||
mutable std::shared_mutex rw_lock;
|
||||
|
||||
std::map<std::string, size_t> attribute_index_by_name;
|
||||
Attributes attributes;
|
||||
|
||||
mutable std::exception_ptr last_exception;
|
||||
mutable size_t error_count = 0;
|
||||
mutable std::chrono::system_clock::time_point backoff_end_time;
|
||||
|
||||
mutable size_t bytes_allocated = 0;
|
||||
mutable std::atomic<size_t> element_count{0};
|
||||
mutable std::atomic<size_t> hit_count{0};
|
||||
|
Loading…
Reference in New Issue
Block a user