From 3dbde50daa3914c964f3eb94a214fd9ffe087d0c Mon Sep 17 00:00:00 2001 From: proller Date: Thu, 27 Jul 2017 22:05:55 +0300 Subject: [PATCH] Split ComplexKeyCacheDictionary to use less then 1.5G ram when compile (#1034) * Split FunctionsArithmetic.cpp to generated functions * Use ccache if found * Do not use ccache if ccache defined in CMAKE_CXX_COMPILER_LAUNCHER * check_include.sh: print memory usage * Try split CacheDictionary source * Split ok * wip * wip * wip * wip * wip * wip --- .../ComplexKeyCacheDictionary.cpp | 495 +----------------- .../Dictionaries/ComplexKeyCacheDictionary.h | 400 +++++++++++++- .../ComplexKeyCacheDictionary_generate1.cpp | 40 ++ .../ComplexKeyCacheDictionary_generate2.cpp | 38 ++ .../ComplexKeyCacheDictionary_generate3.cpp | 38 ++ .../ComplexKeyHashedDictionary.cpp | 1 + utils/check_include.sh | 3 +- 7 files changed, 516 insertions(+), 499 deletions(-) create mode 100644 dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate1.cpp create mode 100644 dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate2.cpp create mode 100644 dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate3.cpp diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp index 59f4f2fb8d7..b8f5c8c1dfd 100644 --- a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp @@ -14,6 +14,7 @@ namespace ProfileEvents { + extern const Event DictCacheKeysRequested; extern const Event DictCacheKeysRequestedMiss; extern const Event DictCacheKeysRequestedFound; @@ -70,36 +71,6 @@ ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const ComplexKeyCacheDictio : ComplexKeyCacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size} {} - -#define DECLARE(TYPE)\ -void ComplexKeyCacheDictionary::get##TYPE(\ - const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ - PaddedPODArray & out) const\ -{\ - dict_struct.validateKeyTypes(key_types);\ - \ - auto & attribute = getAttribute(attribute_name);\ - if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ - throw Exception{\ - name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ - ErrorCodes::TYPE_MISMATCH};\ - \ - const auto null_value = std::get(attribute.null_values);\ - \ - getItemsNumber(attribute, key_columns, out, [&] (const size_t) { return null_value; });\ -} -DECLARE(UInt8) -DECLARE(UInt16) -DECLARE(UInt32) -DECLARE(UInt64) -DECLARE(Int8) -DECLARE(Int16) -DECLARE(Int32) -DECLARE(Int64) -DECLARE(Float32) -DECLARE(Float64) -#undef DECLARE - void ComplexKeyCacheDictionary::getString( const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const @@ -117,33 +88,6 @@ void ComplexKeyCacheDictionary::getString( getItemsString(attribute, key_columns, out, [&] (const size_t) { return null_value; }); } -#define DECLARE(TYPE)\ -void ComplexKeyCacheDictionary::get##TYPE(\ - const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ - const PaddedPODArray & def, PaddedPODArray & out) const\ -{\ - dict_struct.validateKeyTypes(key_types);\ - \ - auto & attribute = getAttribute(attribute_name);\ - if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ - throw Exception{\ - name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ - ErrorCodes::TYPE_MISMATCH};\ - \ - getItemsNumber(attribute, key_columns, out, [&] (const size_t row) { return def[row]; });\ -} -DECLARE(UInt8) -DECLARE(UInt16) -DECLARE(UInt32) -DECLARE(UInt64) -DECLARE(Int8) -DECLARE(Int16) -DECLARE(Int32) -DECLARE(Int64) -DECLARE(Float32) -DECLARE(Float64) -#undef DECLARE - void ComplexKeyCacheDictionary::getString( const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, const ColumnString * const def, ColumnString * const out) const @@ -159,33 +103,6 @@ void ComplexKeyCacheDictionary::getString( getItemsString(attribute, key_columns, out, [&] (const size_t row) { return def->getDataAt(row); }); } -#define DECLARE(TYPE)\ -void ComplexKeyCacheDictionary::get##TYPE(\ - const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ - const TYPE def, PaddedPODArray & out) const\ -{\ - dict_struct.validateKeyTypes(key_types);\ - \ - auto & attribute = getAttribute(attribute_name);\ - if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ - throw Exception{\ - name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ - ErrorCodes::TYPE_MISMATCH};\ - \ - getItemsNumber(attribute, key_columns, out, [&] (const size_t) { return def; });\ -} -DECLARE(UInt8) -DECLARE(UInt16) -DECLARE(UInt32) -DECLARE(UInt64) -DECLARE(Int8) -DECLARE(Int16) -DECLARE(Int32) -DECLARE(Int64) -DECLARE(Float32) -DECLARE(Float64) -#undef DECLARE - void ComplexKeyCacheDictionary::getString( const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, const String & def, ColumnString * const out) const @@ -201,8 +118,6 @@ void ComplexKeyCacheDictionary::getString( getItemsString(attribute, key_columns, out, [&] (const size_t) { return StringRef{def}; }); } - - /// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag, /// true false found and valid /// false true not found (something outdated, maybe our cell) @@ -246,7 +161,6 @@ ComplexKeyCacheDictionary::FindResult ComplexKeyCacheDictionary::findCellIdx(con return {oldest_id, false, false}; } - void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray & out) const { dict_struct.validateKeyTypes(key_types); @@ -321,7 +235,6 @@ void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes }); } - void ComplexKeyCacheDictionary::createAttributes() { const auto attributes_size = dict_struct.attributes.size(); @@ -410,412 +323,6 @@ ComplexKeyCacheDictionary::Attribute ComplexKeyCacheDictionary::createAttributeW return attr; } -template -void ComplexKeyCacheDictionary::getItemsNumber( - Attribute & attribute, - const Columns & key_columns, - PaddedPODArray & out, - DefaultGetter && get_default) const -{ - if (false) {} -#define DISPATCH(TYPE) \ - else if (attribute.type == AttributeUnderlyingType::TYPE) \ - getItemsNumberImpl(attribute, key_columns, out, std::forward(get_default)); - DISPATCH(UInt8) - DISPATCH(UInt16) - DISPATCH(UInt32) - DISPATCH(UInt64) - DISPATCH(Int8) - DISPATCH(Int16) - DISPATCH(Int32) - DISPATCH(Int64) - DISPATCH(Float32) - DISPATCH(Float64) -#undef DISPATCH - else - throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR); -} - -template -void ComplexKeyCacheDictionary::getItemsNumberImpl( - Attribute & attribute, - const Columns & key_columns, - PaddedPODArray & out, - DefaultGetter && get_default) const -{ - /// Mapping: -> { all indices `i` of `key_columns` such that `key_columns[i]` = } - MapType> outdated_keys; - auto & attribute_array = std::get>(attribute.arrays); - - const auto rows_num = key_columns.front()->size(); - const auto keys_size = dict_struct.key.value().size(); - StringRefs keys(keys_size); - Arena temporary_keys_pool; - PODArray keys_array(rows_num); - - size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; - { - const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; - - const auto now = std::chrono::system_clock::now(); - /// fetch up-to-date values, decide which ones require update - for (const auto row : ext::range(0, rows_num)) - { - const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); - keys_array[row] = key; - const auto find_result = findCellIdx(key, now); - - /** cell should be updated if either: - * 1. keys (or hash) do not match, - * 2. cell has expired, - * 3. explicit defaults were specified and cell was set default. */ - - if (!find_result.valid) - { - outdated_keys[key].push_back(row); - if (find_result.outdated) - ++cache_expired; - else - ++cache_not_found; - } - else - { - ++cache_hit; - const auto & cell_idx = find_result.cell_idx; - const auto & cell = cells[cell_idx]; - out[row] = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; - } - } - } - ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); - ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); - ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); - query_count.fetch_add(rows_num, std::memory_order_relaxed); - hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); - - if (outdated_keys.empty()) - return; - - std::vector required_rows(outdated_keys.size()); - std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), - [] (auto & pair) { return pair.second.front(); }); - - /// request new values - update(key_columns, keys_array, required_rows, - [&] (const StringRef key, const size_t cell_idx) - { - for (const auto row : outdated_keys[key]) - out[row] = attribute_array[cell_idx]; - }, - [&] (const StringRef key, const size_t cell_idx) - { - for (const auto row : outdated_keys[key]) - out[row] = get_default(row); - }); -} - - -template -void ComplexKeyCacheDictionary::getItemsString( - Attribute & attribute, const Columns & key_columns, ColumnString * out, - DefaultGetter && get_default) const -{ - const auto rows_num = key_columns.front()->size(); - /// save on some allocations - out->getOffsets().reserve(rows_num); - - const auto keys_size = dict_struct.key.value().size(); - StringRefs keys(keys_size); - Arena temporary_keys_pool; - - auto & attribute_array = std::get>(attribute.arrays); - - auto found_outdated_values = false; - - /// perform optimistic version, fallback to pessimistic if failed - { - const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; - - const auto now = std::chrono::system_clock::now(); - /// fetch up-to-date values, discard on fail - for (const auto row : ext::range(0, rows_num)) - { - const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); - SCOPE_EXIT(temporary_keys_pool.rollback(key.size)); - const auto find_result = findCellIdx(key, now); - - if (!find_result.valid) - { - found_outdated_values = true; - break; - } - else - { - const auto & cell_idx = find_result.cell_idx; - const auto & cell = cells[cell_idx]; - const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; - out->insertData(string_ref.data, string_ref.size); - } - } - } - - /// optimistic code completed successfully - if (!found_outdated_values) - { - query_count.fetch_add(rows_num, std::memory_order_relaxed); - hit_count.fetch_add(rows_num, std::memory_order_release); - return; - } - - /// now onto the pessimistic one, discard possible partial results from the optimistic path - out->getChars().resize_assume_reserved(0); - out->getOffsets().resize_assume_reserved(0); - - /// Mapping: -> { all indices `i` of `key_columns` such that `key_columns[i]` = } - MapType> outdated_keys; - /// we are going to store every string separately - MapType map; - PODArray keys_array(rows_num); - - size_t total_length = 0; - size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; - { - const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; - - const auto now = std::chrono::system_clock::now(); - for (const auto row : ext::range(0, rows_num)) - { - const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); - keys_array[row] = key; - const auto find_result = findCellIdx(key, now); - - if (!find_result.valid) - { - outdated_keys[key].push_back(row); - if (find_result.outdated) - ++cache_expired; - else - ++cache_not_found; - } - else - { - ++cache_hit; - const auto & cell_idx = find_result.cell_idx; - const auto & cell = cells[cell_idx]; - const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; - - if (!cell.isDefault()) - map[key] = copyIntoArena(string_ref, temporary_keys_pool); - - total_length += string_ref.size + 1; - } - } - } - ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); - ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); - ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); - - query_count.fetch_add(rows_num, std::memory_order_relaxed); - hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); - - /// request new values - if (!outdated_keys.empty()) - { - std::vector required_rows(outdated_keys.size()); - std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), - [] (auto & pair) { return pair.second.front(); }); - - update(key_columns, keys_array, required_rows, - [&] (const StringRef key, const size_t cell_idx) - { - const StringRef attribute_value = attribute_array[cell_idx]; - - /// We must copy key and value to own memory, because it may be replaced with another - /// in next iterations of inner loop of update. - const StringRef copied_key = copyIntoArena(key, temporary_keys_pool); - const StringRef copied_value = copyIntoArena(attribute_value, temporary_keys_pool); - - map[copied_key] = copied_value; - total_length += (attribute_value.size + 1) * outdated_keys[key].size(); - }, - [&] (const StringRef key, const size_t cell_idx) - { - for (const auto row : outdated_keys[key]) - total_length += get_default(row).size + 1; - }); - } - - out->getChars().reserve(total_length); - - for (const auto row : ext::range(0, ext::size(keys_array))) - { - const StringRef key = keys_array[row]; - const auto it = map.find(key); - const auto string_ref = it != std::end(map) ? it->second : get_default(row); - out->insertData(string_ref.data, string_ref.size); - } -} - -template -void ComplexKeyCacheDictionary::update( - const Columns & in_key_columns, const PODArray & in_keys, - const std::vector & in_requested_rows, - PresentKeyHandler && on_cell_updated, - AbsentKeyHandler && on_key_not_found) const -{ - MapType remaining_keys{in_requested_rows.size()}; - for (const auto row : in_requested_rows) - remaining_keys.insert({ in_keys[row], false }); - - std::uniform_int_distribution distribution(dict_lifetime.min_sec, dict_lifetime.max_sec); - - const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; - { - Stopwatch watch; - auto stream = source_ptr->loadKeys(in_key_columns, in_requested_rows); - stream->readPrefix(); - - const auto keys_size = dict_struct.key.value().size(); - StringRefs keys(keys_size); - - const auto attributes_size = attributes.size(); - const auto now = std::chrono::system_clock::now(); - - while (const auto block = stream->read()) - { - /// cache column pointers - const auto key_columns = ext::map( - ext::range(0, keys_size), - [&] (const size_t attribute_idx) - { - return block.safeGetByPosition(attribute_idx).column; - }); - - const auto attribute_columns = ext::map( - ext::range(0, attributes_size), - [&] (const size_t attribute_idx) - { - return block.safeGetByPosition(keys_size + attribute_idx).column; - }); - - const auto rows_num = block.rows(); - - for (const auto row : ext::range(0, rows_num)) - { - auto key = allocKey(row, key_columns, keys); - const auto hash = StringRefHash{}(key); - const auto find_result = findCellIdx(key, now, hash); - const auto & cell_idx = find_result.cell_idx; - auto & cell = cells[cell_idx]; - - for (const auto attribute_idx : ext::range(0, attributes.size())) - { - const auto & attribute_column = *attribute_columns[attribute_idx]; - auto & attribute = attributes[attribute_idx]; - - setAttributeValue(attribute, cell_idx, attribute_column[row]); - } - - /// if cell id is zero and zero does not map to this cell, then the cell is unused - if (cell.key == StringRef{} && cell_idx != zero_cell_idx) - element_count.fetch_add(1, std::memory_order_relaxed); - - /// handle memory allocated for old key - if (key == cell.key) - { - freeKey(key); - key = cell.key; - } - else - { - /// new key is different from the old one - if (cell.key.data) - freeKey(cell.key); - - cell.key = key; - } - - cell.hash = hash; - - if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) - cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); - else - cell.setExpiresAt(std::chrono::time_point::max()); - - /// inform caller - on_cell_updated(key, cell_idx); - /// mark corresponding id as found - remaining_keys[key] = true; - } - } - - stream->readSuffix(); - - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, in_requested_rows.size()); - ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); - } - - size_t found_num = 0; - size_t not_found_num = 0; - - const auto now = std::chrono::system_clock::now(); - - /// Check which ids have not been found and require setting null_value - for (const auto key_found_pair : remaining_keys) - { - if (key_found_pair.second) - { - ++found_num; - continue; - } - - ++not_found_num; - - auto key = key_found_pair.first; - const auto hash = StringRefHash{}(key); - const auto find_result = findCellIdx(key, now, hash); - const auto & cell_idx = find_result.cell_idx; - auto & cell = cells[cell_idx]; - - /// Set null_value for each attribute - for (auto & attribute : attributes) - setDefaultAttributeValue(attribute, cell_idx); - - /// Check if cell had not been occupied before and increment element counter if it hadn't - if (cell.key == StringRef{} && cell_idx != zero_cell_idx) - element_count.fetch_add(1, std::memory_order_relaxed); - - if (key == cell.key) - key = cell.key; - else - { - if (cell.key.data) - freeKey(cell.key); - - /// copy key from temporary pool - key = copyKey(key); - cell.key = key; - } - - cell.hash = hash; - - if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) - cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); - else - cell.setExpiresAt(std::chrono::time_point::max()); - - cell.setDefault(); - - /// inform caller that the cell has not been found - on_key_not_found(key, cell_idx); - } - - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, found_num); - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); - -} - - void ComplexKeyCacheDictionary::setDefaultAttributeValue(Attribute & attribute, const size_t idx) const { switch (attribute.type) diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h index f4ed8c88a3b..784e6f012e1 100644 --- a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.h @@ -6,9 +6,12 @@ #include #include #include +#include #include #include #include +#include +#include #include #include #include @@ -18,6 +21,20 @@ #include +namespace ProfileEvents +{ + + extern const Event DictCacheKeysRequested; + extern const Event DictCacheKeysRequestedMiss; + extern const Event DictCacheKeysRequestedFound; + extern const Event DictCacheKeysExpired; + extern const Event DictCacheKeysNotFound; + extern const Event DictCacheKeysHit; + extern const Event DictCacheRequestTimeNs; + extern const Event DictCacheLockWriteNs; + extern const Event DictCacheLockReadNs; +} + namespace DB { @@ -197,26 +214,400 @@ private: Attribute & attribute, const Columns & key_columns, PaddedPODArray & out, - DefaultGetter && get_default) const; + DefaultGetter && get_default) const{ + if (false) {} +#define DISPATCH(TYPE) \ + else if (attribute.type == AttributeUnderlyingType::TYPE) \ + getItemsNumberImpl(attribute, key_columns, out, std::forward(get_default)); + DISPATCH(UInt8) + DISPATCH(UInt16) + DISPATCH(UInt32) + DISPATCH(UInt64) + DISPATCH(Int8) + DISPATCH(Int16) + DISPATCH(Int32) + DISPATCH(Int64) + DISPATCH(Float32) + DISPATCH(Float64) +#undef DISPATCH + else + throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR); +}; template void getItemsNumberImpl( Attribute & attribute, const Columns & key_columns, PaddedPODArray & out, - DefaultGetter && get_default) const; + DefaultGetter && get_default) const { + /// Mapping: -> { all indices `i` of `key_columns` such that `key_columns[i]` = } + MapType> outdated_keys; + auto & attribute_array = std::get>(attribute.arrays); + + const auto rows_num = key_columns.front()->size(); + const auto keys_size = dict_struct.key.value().size(); + StringRefs keys(keys_size); + Arena temporary_keys_pool; + PODArray keys_array(rows_num); + + size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; + { + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + + const auto now = std::chrono::system_clock::now(); + /// fetch up-to-date values, decide which ones require update + for (const auto row : ext::range(0, rows_num)) + { + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); + keys_array[row] = key; + const auto find_result = findCellIdx(key, now); + + /** cell should be updated if either: + * 1. keys (or hash) do not match, + * 2. cell has expired, + * 3. explicit defaults were specified and cell was set default. */ + + if (!find_result.valid) + { + outdated_keys[key].push_back(row); + if (find_result.outdated) + ++cache_expired; + else + ++cache_not_found; + } + else + { + ++cache_hit; + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; + out[row] = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; + } + } + } + ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); + ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); + ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); + + if (outdated_keys.empty()) + return; + + std::vector required_rows(outdated_keys.size()); + std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), + [] (auto & pair) { return pair.second.front(); }); + + /// request new values + update(key_columns, keys_array, required_rows, + [&] (const StringRef key, const size_t cell_idx) + { + for (const auto row : outdated_keys[key]) + out[row] = attribute_array[cell_idx]; + }, + [&] (const StringRef key, const size_t cell_idx) + { + for (const auto row : outdated_keys[key]) + out[row] = get_default(row); + }); +}; template void getItemsString( Attribute & attribute, const Columns & key_columns, ColumnString * out, - DefaultGetter && get_default) const; + DefaultGetter && get_default) const { + const auto rows_num = key_columns.front()->size(); + /// save on some allocations + out->getOffsets().reserve(rows_num); + + const auto keys_size = dict_struct.key.value().size(); + StringRefs keys(keys_size); + Arena temporary_keys_pool; + + auto & attribute_array = std::get>(attribute.arrays); + + auto found_outdated_values = false; + + /// perform optimistic version, fallback to pessimistic if failed + { + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + + const auto now = std::chrono::system_clock::now(); + /// fetch up-to-date values, discard on fail + for (const auto row : ext::range(0, rows_num)) + { + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); + SCOPE_EXIT(temporary_keys_pool.rollback(key.size)); + const auto find_result = findCellIdx(key, now); + + if (!find_result.valid) + { + found_outdated_values = true; + break; + } + else + { + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; + const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; + out->insertData(string_ref.data, string_ref.size); + } + } + } + + /// optimistic code completed successfully + if (!found_outdated_values) + { + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num, std::memory_order_release); + return; + } + + /// now onto the pessimistic one, discard possible partial results from the optimistic path + out->getChars().resize_assume_reserved(0); + out->getOffsets().resize_assume_reserved(0); + + /// Mapping: -> { all indices `i` of `key_columns` such that `key_columns[i]` = } + MapType> outdated_keys; + /// we are going to store every string separately + MapType map; + PODArray keys_array(rows_num); + + size_t total_length = 0; + size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; + { + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + + const auto now = std::chrono::system_clock::now(); + for (const auto row : ext::range(0, rows_num)) + { + const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); + keys_array[row] = key; + const auto find_result = findCellIdx(key, now); + + if (!find_result.valid) + { + outdated_keys[key].push_back(row); + if (find_result.outdated) + ++cache_expired; + else + ++cache_not_found; + } + else + { + ++cache_hit; + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; + const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; + + if (!cell.isDefault()) + map[key] = copyIntoArena(string_ref, temporary_keys_pool); + + total_length += string_ref.size + 1; + } + } + } + ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); + ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); + ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); + + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); + + /// request new values + if (!outdated_keys.empty()) + { + std::vector required_rows(outdated_keys.size()); + std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), + [] (auto & pair) { return pair.second.front(); }); + + update(key_columns, keys_array, required_rows, + [&] (const StringRef key, const size_t cell_idx) + { + const StringRef attribute_value = attribute_array[cell_idx]; + + /// We must copy key and value to own memory, because it may be replaced with another + /// in next iterations of inner loop of update. + const StringRef copied_key = copyIntoArena(key, temporary_keys_pool); + const StringRef copied_value = copyIntoArena(attribute_value, temporary_keys_pool); + + map[copied_key] = copied_value; + total_length += (attribute_value.size + 1) * outdated_keys[key].size(); + }, + [&] (const StringRef key, const size_t cell_idx) + { + for (const auto row : outdated_keys[key]) + total_length += get_default(row).size + 1; + }); + } + + out->getChars().reserve(total_length); + + for (const auto row : ext::range(0, ext::size(keys_array))) + { + const StringRef key = keys_array[row]; + const auto it = map.find(key); + const auto string_ref = it != std::end(map) ? it->second : get_default(row); + out->insertData(string_ref.data, string_ref.size); + } +}; template void update( const Columns & in_key_columns, const PODArray & in_keys, const std::vector & in_requested_rows, PresentKeyHandler && on_cell_updated, - AbsentKeyHandler && on_key_not_found) const; + AbsentKeyHandler && on_key_not_found) const{ + MapType remaining_keys{in_requested_rows.size()}; + for (const auto row : in_requested_rows) + remaining_keys.insert({ in_keys[row], false }); + + std::uniform_int_distribution distribution(dict_lifetime.min_sec, dict_lifetime.max_sec); + + const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; + { + Stopwatch watch; + auto stream = source_ptr->loadKeys(in_key_columns, in_requested_rows); + stream->readPrefix(); + + const auto keys_size = dict_struct.key.value().size(); + StringRefs keys(keys_size); + + const auto attributes_size = attributes.size(); + const auto now = std::chrono::system_clock::now(); + + while (const auto block = stream->read()) + { + /// cache column pointers + const auto key_columns = ext::map( + ext::range(0, keys_size), + [&] (const size_t attribute_idx) + { + return block.safeGetByPosition(attribute_idx).column; + }); + + const auto attribute_columns = ext::map( + ext::range(0, attributes_size), + [&] (const size_t attribute_idx) + { + return block.safeGetByPosition(keys_size + attribute_idx).column; + }); + + const auto rows_num = block.rows(); + + for (const auto row : ext::range(0, rows_num)) + { + auto key = allocKey(row, key_columns, keys); + const auto hash = StringRefHash{}(key); + const auto find_result = findCellIdx(key, now, hash); + const auto & cell_idx = find_result.cell_idx; + auto & cell = cells[cell_idx]; + + for (const auto attribute_idx : ext::range(0, attributes.size())) + { + const auto & attribute_column = *attribute_columns[attribute_idx]; + auto & attribute = attributes[attribute_idx]; + + setAttributeValue(attribute, cell_idx, attribute_column[row]); + } + + /// if cell id is zero and zero does not map to this cell, then the cell is unused + if (cell.key == StringRef{} && cell_idx != zero_cell_idx) + element_count.fetch_add(1, std::memory_order_relaxed); + + /// handle memory allocated for old key + if (key == cell.key) + { + freeKey(key); + key = cell.key; + } + else + { + /// new key is different from the old one + if (cell.key.data) + freeKey(cell.key); + + cell.key = key; + } + + cell.hash = hash; + + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); + else + cell.setExpiresAt(std::chrono::time_point::max()); + + /// inform caller + on_cell_updated(key, cell_idx); + /// mark corresponding id as found + remaining_keys[key] = true; + } + } + + stream->readSuffix(); + + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, in_requested_rows.size()); + ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); + } + + size_t found_num = 0; + size_t not_found_num = 0; + + const auto now = std::chrono::system_clock::now(); + + /// Check which ids have not been found and require setting null_value + for (const auto key_found_pair : remaining_keys) + { + if (key_found_pair.second) + { + ++found_num; + continue; + } + + ++not_found_num; + + auto key = key_found_pair.first; + const auto hash = StringRefHash{}(key); + const auto find_result = findCellIdx(key, now, hash); + const auto & cell_idx = find_result.cell_idx; + auto & cell = cells[cell_idx]; + + /// Set null_value for each attribute + for (auto & attribute : attributes) + setDefaultAttributeValue(attribute, cell_idx); + + /// Check if cell had not been occupied before and increment element counter if it hadn't + if (cell.key == StringRef{} && cell_idx != zero_cell_idx) + element_count.fetch_add(1, std::memory_order_relaxed); + + if (key == cell.key) + key = cell.key; + else + { + if (cell.key.data) + freeKey(cell.key); + + /// copy key from temporary pool + key = copyKey(key); + cell.key = key; + } + + cell.hash = hash; + + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); + else + cell.setExpiresAt(std::chrono::time_point::max()); + + cell.setDefault(); + + /// inform caller that the cell has not been found + on_key_not_found(key, cell_idx); + } + + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, found_num); + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); + +}; UInt64 getCellIdx(const StringRef key) const; @@ -296,4 +687,5 @@ private: const std::chrono::time_point creation_time = std::chrono::system_clock::now(); }; + } diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate1.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate1.cpp new file mode 100644 index 00000000000..085c6c8bf0d --- /dev/null +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate1.cpp @@ -0,0 +1,40 @@ +#include "ComplexKeyCacheDictionary.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TYPE_MISMATCH; +} + +#define DECLARE(TYPE)\ +void ComplexKeyCacheDictionary::get##TYPE(\ + const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ + PaddedPODArray & out) const\ +{\ + dict_struct.validateKeyTypes(key_types);\ + \ + auto & attribute = getAttribute(attribute_name);\ + if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ + throw Exception{\ + name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ + ErrorCodes::TYPE_MISMATCH};\ + \ + const auto null_value = std::get(attribute.null_values);\ + \ + getItemsNumber(attribute, key_columns, out, [&] (const size_t) { return null_value; });\ +} +DECLARE(UInt8) +DECLARE(UInt16) +DECLARE(UInt32) +DECLARE(UInt64) +DECLARE(Int8) +DECLARE(Int16) +DECLARE(Int32) +DECLARE(Int64) +DECLARE(Float32) +DECLARE(Float64) +#undef DECLARE + +} diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate2.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate2.cpp new file mode 100644 index 00000000000..cdac3d5c93d --- /dev/null +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate2.cpp @@ -0,0 +1,38 @@ +#include "ComplexKeyCacheDictionary.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TYPE_MISMATCH; +} + +#define DECLARE(TYPE)\ +void ComplexKeyCacheDictionary::get##TYPE(\ + const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ + const PaddedPODArray & def, PaddedPODArray & out) const\ +{\ + dict_struct.validateKeyTypes(key_types);\ + \ + auto & attribute = getAttribute(attribute_name);\ + if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ + throw Exception{\ + name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ + ErrorCodes::TYPE_MISMATCH};\ + \ + getItemsNumber(attribute, key_columns, out, [&] (const size_t row) { return def[row]; });\ +} +DECLARE(UInt8) +DECLARE(UInt16) +DECLARE(UInt32) +DECLARE(UInt64) +DECLARE(Int8) +DECLARE(Int16) +DECLARE(Int32) +DECLARE(Int64) +DECLARE(Float32) +DECLARE(Float64) +#undef DECLARE + +} diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate3.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate3.cpp new file mode 100644 index 00000000000..b10702a9fea --- /dev/null +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary_generate3.cpp @@ -0,0 +1,38 @@ +#include "ComplexKeyCacheDictionary.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TYPE_MISMATCH; +} + +#define DECLARE(TYPE)\ +void ComplexKeyCacheDictionary::get##TYPE(\ + const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\ + const TYPE def, PaddedPODArray & out) const\ +{\ + dict_struct.validateKeyTypes(key_types);\ + \ + auto & attribute = getAttribute(attribute_name);\ + if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::TYPE))\ + throw Exception{\ + name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ + ErrorCodes::TYPE_MISMATCH};\ + \ + getItemsNumber(attribute, key_columns, out, [&] (const size_t) { return def; });\ +} +DECLARE(UInt8) +DECLARE(UInt16) +DECLARE(UInt32) +DECLARE(UInt64) +DECLARE(Int8) +DECLARE(Int16) +DECLARE(Int32) +DECLARE(Int64) +DECLARE(Float32) +DECLARE(Float64) +#undef DECLARE + +} diff --git a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp index c5291603341..62895bad44a 100644 --- a/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp +++ b/dbms/src/Dictionaries/ComplexKeyHashedDictionary.cpp @@ -21,6 +21,7 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary( : name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime), require_nonempty(require_nonempty) { + createAttributes(); try diff --git a/utils/check_include.sh b/utils/check_include.sh index a7a77726423..48ad539f349 100755 --- a/utils/check_include.sh +++ b/utils/check_include.sh @@ -1,4 +1,5 @@ #!/bin/sh +# sudo apt install time # Small .h isolated compile checker # Finds missing #include <...> # prints compile time, number of includes, use with sort: ./check_include.sh 2>&1 | sort -rk3 @@ -40,5 +41,5 @@ if [ -z $1 ]; then else echo -n "$1 " echo -n `grep "#include" $1| wc -l` " " - echo -e "#include <$1> \n int main() {return 0;}" | bash -c "TIMEFORMAT='%3R'; time g++-6 -c -std=gnu++1z $inc -x c++ -" + echo -e "#include <$1> \n int main() {return 0;}" | time --format "%e %M" g++-6 -c -std=gnu++1z $inc -x c++ - fi