This commit is contained in:
Nikita Vasilev 2020-02-01 21:13:43 +03:00
parent 98e54aaae0
commit 21577c3395
4 changed files with 110 additions and 145 deletions

View File

@ -415,9 +415,9 @@ void CachePartition::flush()
std::visit([](auto & attr) { attr.clear(); }, keys_buffer.values);
}
template <typename Out>
template <typename Out, typename GetDefault>
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::vector<bool> & found,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const
{
auto set_value = [&](const size_t index, ReadBuffer & buf)
@ -426,11 +426,17 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
readBinary(out[index], buf);
};
getImpl(ids, set_value, found, now);
auto set_default = [&](const size_t index)
{
out[index] = get_default(index);
};
getImpl(ids, set_value, set_default, found, now);
}
void CachePartition::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found, std::chrono::system_clock::time_point now) const
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found, std::vector<size_t> & default_ids,
std::chrono::system_clock::time_point now) const
{
auto set_value = [&](const size_t index, ReadBuffer & buf)
{
@ -443,12 +449,17 @@ void CachePartition::getString(const size_t attribute_index, const PaddedPODArra
refs[index].size = size;
};
getImpl(ids, set_value, found, now);
auto set_default = [&](const size_t index)
{
default_ids.push_back(index);
};
getImpl(ids, set_value, set_default, found, now);
}
template <typename SetFunc>
void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const
template <typename SetFunc, typename SetDefault>
void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, SetDefault & set_default,
std::vector<bool> & found, std::chrono::system_clock::time_point now) const
{
std::shared_lock lock(rw_lock);
PaddedPODArray<Index> indices(ids.size());
@ -461,7 +472,13 @@ void CachePartition::getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set,
else if (auto it = key_to_index_and_metadata.find(ids[i]);
it != std::end(key_to_index_and_metadata) && it->second.metadata.expiresAt() > now)
{
indices[i] = it->second.index;
if (unlikely(it->second.metadata.isDefault()))
{
indices[i].setNotExists();
set_default(i);
}
else
indices[i] = it->second.index;
found[i] = true;
}
else
@ -718,41 +735,43 @@ CacheStorage::~CacheStorage()
collectGarbage();
}
template <typename Out>
template <typename Out, typename GetDefault>
void CacheStorage::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
GetDefault & get_default, std::chrono::system_clock::time_point now) const
{
std::vector<bool> found(ids.size(), false);
{
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
partition->getValue<Out>(attribute_index, ids, out, found, now);
for (size_t i = 0; i < ids.size(); ++i)
if (!found[i])
not_found[ids[i]].push_back(i);
partition->getValue<Out>(attribute_index, ids, out, found, get_default, now);
}
for (size_t i = 0; i < ids.size(); ++i)
if (!found[i])
not_found[ids[i]].push_back(i);
query_count.fetch_add(ids.size(), std::memory_order_relaxed);
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
}
void CacheStorage::getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const
{
std::vector<bool> found(ids.size(), false);
{
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
partition->getString(attribute_index, ids, refs, arena, found, now);
for (size_t i = 0; i < ids.size(); ++i)
if (!found[i])
not_found[ids[i]].push_back(i);
partition->getString(attribute_index, ids, refs, arena, found, default_ids, now);
}
for (size_t i = 0; i < ids.size(); ++i)
if (!found[i])
not_found[ids[i]].push_back(i);
query_count.fetch_add(ids.size(), std::memory_order_relaxed);
hit_count.fetch_add(ids.size() - not_found.size(), std::memory_order_release);
}
@ -776,7 +795,7 @@ void CacheStorage::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values)
const DictionaryLifetime lifetime)
{
auto append_block = [this](const CachePartition::Attribute & new_keys,
const CachePartition::Attributes & new_attributes, const PaddedPODArray<CachePartition::Metadata> & metadata)
@ -866,52 +885,30 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
}
size_t not_found_num = 0, found_num = 0;
auto append_defaults = [this](const CachePartition::Attribute & new_keys, const PaddedPODArray<CachePartition::Metadata> & metadata)
{
size_t inserted = 0;
while (inserted < metadata.size())
{
if (!partitions.empty())
inserted += partitions.front()->appendDefaults(new_keys, metadata, inserted);
if (inserted < metadata.size())
{
partitions.emplace_front(std::make_unique<CachePartition>(
AttributeUnderlyingType::utUInt64, attributes_structure, path,
(partitions.empty() ? 0 : partitions.front()->getId() + 1),
partition_size, block_size, read_buffer_size, write_buffer_size));
}
}
collectGarbage();
};
size_t not_found_num = 0, found_num = 0;
/// Check which ids have not been found and require setting null_value
CachePartition::Attribute new_keys;
new_keys.type = AttributeUnderlyingType::utUInt64;
new_keys.values = CachePartition::Attribute::Container<UInt64>();
CachePartition::Attributes new_attributes;
{
/// TODO: create attributes from structure
for (const auto & attribute_type : attributes_structure)
{
switch (attribute_type)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
new_attributes.emplace_back(); \
new_attributes.back().type = attribute_type; \
new_attributes.back().values = CachePartition::Attribute::Container<TYPE>(); \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
{
new_attributes.emplace_back();
new_attributes.back().type = attribute_type;
new_attributes.back().values = CachePartition::Attribute::Container<String>();
}
break;
}
}
}
PaddedPODArray<CachePartition::Metadata> metadata;
@ -942,48 +939,6 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
metadata.back().setDefault();
/// Set null_value for each attribute
for (size_t i = 0; i < attributes_structure.size(); ++i)
{
const auto & attribute = attributes_structure[i];
// append null
switch (attribute)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
auto & to_values = std::get<CachePartition::Attribute::Container<TYPE>>(new_attributes[i].values); \
auto & null_value = std::get<TYPE>(null_values[i]); \
to_values.push_back(null_value); \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
{
auto & to_values = std::get<CachePartition::Attribute::Container<String>>(new_attributes[i].values);
auto & null_value = std::get<String>(null_values[i]);
to_values.push_back(null_value);
}
break;
}
}
/// inform caller that the cell has not been found
on_id_not_found(id);
}
@ -991,7 +946,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
{
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (not_found_num)
append_block(new_keys, new_attributes, metadata);
append_defaults(new_keys, metadata);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
@ -1237,7 +1192,7 @@ void SSDCacheDictionary::getItemsNumberImpl(
const auto now = std::chrono::system_clock::now();
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids, now);
storage.getValue<OutputType>(attribute_index, ids, out, not_found_ids, get_default, now);
if (not_found_ids.empty())
return;
@ -1256,8 +1211,7 @@ void SSDCacheDictionary::getItemsNumberImpl(
for (const size_t row : not_found_ids[id])
out[row] = get_default(row);
},
getLifetime(),
null_values);
getLifetime());
}
void SSDCacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
@ -1298,11 +1252,24 @@ void SSDCacheDictionary::getItemsStringImpl(const size_t attribute_index, const
StringRefs refs(ids.size());
ArenaWithFreeLists string_arena;
storage.getString(attribute_index, ids, refs, string_arena, not_found_ids, now);
std::vector<size_t> default_rows;
storage.getString(attribute_index, ids, refs, string_arena, not_found_ids, default_rows, now);
std::sort(std::begin(default_rows), std::end(default_rows));
if (not_found_ids.empty())
{
size_t default_index = 0;
for (size_t row = 0; row < ids.size(); ++row)
out->insertData(refs[row].data, refs[row].size);
{
if (unlikely(default_index != default_rows.size() && default_rows[default_index] == row))
{
auto to_insert = get_default(row);
out->insertData(to_insert.data, to_insert.size);
++default_index;
}
else
out->insertData(refs[row].data, refs[row].size);
}
return;
}
@ -1319,33 +1286,30 @@ void SSDCacheDictionary::getItemsStringImpl(const size_t attribute_index, const
update_result[id] = std::get<CachePartition::Attribute::Container<String>>(new_attributes[attribute_index].values)[row];
},
[&](const size_t) {},
getLifetime(),
null_values);
getLifetime());
LOG_DEBUG(&Poco::Logger::get("log"), "fill data");
size_t default_index = 0;
for (size_t row = 0; row < ids.size(); ++row)
{
const auto & id = ids[row];
auto it = not_found_ids.find(id);
if (it == std::end(not_found_ids))
if (unlikely(default_index != default_rows.size() && default_rows[default_index] == row))
{
auto to_insert = get_default(row);
out->insertData(to_insert.data, to_insert.size);
++default_index;
}
else if (auto it = not_found_ids.find(id); it == std::end(not_found_ids))
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill found " << row << " " << id);
out->insertData(refs[row].data, refs[row].size);
}
else if (auto it_update = update_result.find(id); it_update != std::end(update_result))
{
out->insertData(it_update->second.data(), it_update->second.size());
}
else
{
auto it_update = update_result.find(id);
if (it_update != std::end(update_result))
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill update " << row << " " << id);
out->insertData(it_update->second.data(), it_update->second.size());
}
else
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill default " << row << " " << id);
auto to_insert = get_default(row);
out->insertData(to_insert.data, to_insert.size);
}
auto to_insert = get_default(row);
out->insertData(to_insert.data, to_insert.size);
}
}
}
@ -1374,8 +1338,7 @@ void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UIn
for (const size_t row : not_found_ids[id])
out[row] = false;
},
getLifetime(),
null_values);
getLifetime());
}
BlockInputStreamPtr SSDCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const

View File

@ -97,14 +97,14 @@ public:
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out>
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::vector<bool> & found,
ResultArrayType<Out> & out, std::vector<bool> & found, GetDefault & get_default,
std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const;
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out, std::chrono::system_clock::time_point now) const;
@ -151,9 +151,9 @@ public:
size_t getElementCount() const;
private:
template <typename SetFunc>
void getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const;
template <typename SetFunc, typename SetDefault>
void getImpl(const PaddedPODArray<UInt64> & ids, SetFunc & set, SetDefault & set_default,
std::vector<bool> & found, std::chrono::system_clock::time_point now) const;
template <typename SetFunc>
void getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc & set) const;
@ -218,14 +218,14 @@ public:
template <typename T>
using ResultArrayType = CachePartition::ResultArrayType<T>;
template <typename Out>
template <typename Out, typename GetDefault>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const;
GetDefault & get_default, std::chrono::system_clock::time_point now) const;
void getString(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
StringRefs & refs, ArenaWithFreeLists & arena, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const;
std::vector<size_t> & default_ids, std::chrono::system_clock::time_point now) const;
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const;
@ -233,7 +233,7 @@ public:
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(DictionarySourcePtr & source_ptr, const std::vector<Key> & requested_ids,
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values);
const DictionaryLifetime lifetime);
PaddedPODArray<Key> getCachedIds() const;

View File

@ -24,7 +24,8 @@ HAS
5
10
VALUES NOT FROM TABLE
0 -1
0 -1 none
0 -1 none
DUPLICATE KEYS
1 -100
2 4

View File

@ -100,8 +100,9 @@ SELECT 'HAS';
SELECT id FROM database_for_dict.keys_table WHERE dictHas('database_for_dict.ssd_dict', toUInt64(id)) ORDER BY id;
SELECT 'VALUES NOT FROM TABLE';
-- 0 -1
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(1000000)), dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1000000));
-- 0 -1 none
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(1000000)), dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1000000)), dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1000000));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(1000000)), dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1000000)), dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1000000));
SELECT 'DUPLICATE KEYS';
SELECT arrayJoin([1, 2, 3, 3, 2, 1]) AS id, dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(id));