Merge pull request #23825 from kitaisreal/update-field-include-bytes-allocated-fix

Flat, Hashed dictionary include update field bytes into bytes_allocated
This commit is contained in:
Maksim Kita 2021-05-01 15:25:07 +03:00 committed by GitHub
commit 9f369a0a27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 25 deletions

View File

@ -30,14 +30,14 @@ FlatDictionary::FlatDictionary(
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
Configuration configuration_,
BlockPtr previously_loaded_block_)
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, configuration(configuration_)
, loaded_keys(configuration.initial_array_size, false)
, previously_loaded_block(std::move(previously_loaded_block_))
, update_field_loaded_block(std::move(update_field_loaded_block_))
{
createAttributes();
loadData();
@ -273,7 +273,7 @@ void FlatDictionary::blockToAttributes(const Block & block)
void FlatDictionary::updateData()
{
if (!previously_loaded_block || previously_loaded_block->rows() == 0)
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
@ -281,13 +281,13 @@ void FlatDictionary::updateData()
while (const auto block = stream->read())
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!previously_loaded_block)
previously_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
if (!update_field_loaded_block)
update_field_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (size_t column_index = 0; column_index < block.columns(); ++column_index)
{
const IColumn & update_column = *block.getByPosition(column_index).column.get();
MutableColumnPtr saved_column = previously_loaded_block->getByPosition(column_index).column->assumeMutable();
MutableColumnPtr saved_column = update_field_loaded_block->getByPosition(column_index).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
@ -298,12 +298,12 @@ void FlatDictionary::updateData()
auto stream = source_ptr->loadUpdatedAll();
mergeBlockWithStream<DictionaryKeyType::simple>(
dict_struct.getKeysSize(),
*previously_loaded_block,
*update_field_loaded_block,
stream);
}
if (previously_loaded_block)
blockToAttributes(*previously_loaded_block.get());
if (update_field_loaded_block)
blockToAttributes(*update_field_loaded_block.get());
}
void FlatDictionary::loadData()
@ -347,6 +347,9 @@ void FlatDictionary::calculateBytesAllocated()
callOnDictionaryAttributeType(attribute.type, type_call);
}
if (update_field_loaded_block)
bytes_allocated += update_field_loaded_block->allocatedBytes();
}
FlatDictionary::Attribute FlatDictionary::createAttribute(const DictionaryAttribute & dictionary_attribute, const Field & null_value)

View File

@ -39,7 +39,7 @@ public:
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
Configuration configuration_,
BlockPtr previously_loaded_block_ = nullptr);
BlockPtr update_field_loaded_block_ = nullptr);
std::string getTypeName() const override { return "Flat"; }
@ -55,7 +55,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<FlatDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, configuration, previously_loaded_block);
return std::make_shared<FlatDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, configuration, update_field_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -184,7 +184,7 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
BlockPtr previously_loaded_block;
BlockPtr update_field_loaded_block;
};
}

View File

@ -42,13 +42,13 @@ HashedDictionary<dictionary_key_type, sparse>::HashedDictionary(
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr previously_loaded_block_)
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, dict_lifetime(dict_lifetime_)
, require_nonempty(require_nonempty_)
, previously_loaded_block(std::move(previously_loaded_block_))
, update_field_loaded_block(std::move(update_field_loaded_block_))
{
createAttributes();
loadData();
@ -343,7 +343,7 @@ void HashedDictionary<dictionary_key_type, sparse>::createAttributes()
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::updateData()
{
if (!previously_loaded_block || previously_loaded_block->rows() == 0)
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
@ -351,13 +351,13 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
while (const auto block = stream->read())
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!previously_loaded_block)
previously_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
if (!update_field_loaded_block)
update_field_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = previously_loaded_block->getByPosition(attribute_idx).column->assumeMutable();
MutableColumnPtr saved_column = update_field_loaded_block->getByPosition(attribute_idx).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
@ -368,14 +368,14 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
auto stream = source_ptr->loadUpdatedAll();
mergeBlockWithStream<dictionary_key_type>(
dict_struct.getKeysSize(),
*previously_loaded_block,
*update_field_loaded_block,
stream);
}
if (previously_loaded_block)
if (update_field_loaded_block)
{
resize(previously_loaded_block->rows());
blockToAttributes(*previously_loaded_block.get());
resize(update_field_loaded_block->rows());
blockToAttributes(*update_field_loaded_block.get());
}
}
@ -586,6 +586,9 @@ void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
}
bytes_allocated += complex_key_arena.size();
if (update_field_loaded_block)
bytes_allocated += update_field_loaded_block->allocatedBytes();
}
template <DictionaryKeyType dictionary_key_type, bool sparse>

View File

@ -41,7 +41,7 @@ public:
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr previously_loaded_block_ = nullptr);
BlockPtr update_field_loaded_block_ = nullptr);
std::string getTypeName() const override
{
@ -67,7 +67,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, previously_loaded_block);
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, update_field_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -220,7 +220,7 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
BlockPtr previously_loaded_block;
BlockPtr update_field_loaded_block;
Arena complex_key_arena;
};