From e69188826746d3946027169150d192356b4e39f9 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 12 Aug 2022 13:03:57 +0200 Subject: [PATCH] fix build --- src/Storages/MergeTree/IDataPartStorage.h | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 50 +++++++++---------- src/Storages/MergeTree/IMergeTreeDataPart.h | 40 +++------------ src/Storages/MergeTree/MergeTreeData.cpp | 14 +++--- src/Storages/MergeTree/MergeTreeData.h | 2 +- .../MergeTree/MergeTreeDataPartState.h | 29 +++++++++++ src/Storages/StorageReplicatedMergeTree.cpp | 4 +- src/Storages/System/StorageSystemParts.cpp | 2 +- .../System/StorageSystemPartsColumns.cpp | 2 +- .../System/StorageSystemProjectionParts.cpp | 2 +- .../StorageSystemProjectionPartsColumns.cpp | 2 +- 11 files changed, 77 insertions(+), 72 deletions(-) create mode 100644 src/Storages/MergeTree/MergeTreeDataPartState.h diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 54b3927d077..946c1c5fd47 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include namespace DB diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index d9fba3ef6bd..00ecbacdcc2 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -207,55 +207,55 @@ void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, St } -static void incrementStateMetric(IMergeTreeDataPart::State state) +static void incrementStateMetric(MergeTreeDataPartState state) { switch (state) { - case IMergeTreeDataPart::State::Temporary: + case MergeTreeDataPartState::Temporary: CurrentMetrics::add(CurrentMetrics::PartsTemporary); return; - case IMergeTreeDataPart::State::PreActive: + case MergeTreeDataPartState::PreActive: CurrentMetrics::add(CurrentMetrics::PartsPreActive); CurrentMetrics::add(CurrentMetrics::PartsPreCommitted); return; - case IMergeTreeDataPart::State::Active: + case MergeTreeDataPartState::Active: CurrentMetrics::add(CurrentMetrics::PartsActive); CurrentMetrics::add(CurrentMetrics::PartsCommitted); return; - case IMergeTreeDataPart::State::Outdated: + case MergeTreeDataPartState::Outdated: CurrentMetrics::add(CurrentMetrics::PartsOutdated); return; - case IMergeTreeDataPart::State::Deleting: + case MergeTreeDataPartState::Deleting: CurrentMetrics::add(CurrentMetrics::PartsDeleting); return; - case IMergeTreeDataPart::State::DeleteOnDestroy: + case MergeTreeDataPartState::DeleteOnDestroy: CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy); return; } } -static void decrementStateMetric(IMergeTreeDataPart::State state) +static void decrementStateMetric(MergeTreeDataPartState state) { switch (state) { - case IMergeTreeDataPart::State::Temporary: + case MergeTreeDataPartState::Temporary: CurrentMetrics::sub(CurrentMetrics::PartsTemporary); return; - case IMergeTreeDataPart::State::PreActive: + case MergeTreeDataPartState::PreActive: CurrentMetrics::sub(CurrentMetrics::PartsPreActive); CurrentMetrics::sub(CurrentMetrics::PartsPreCommitted); return; - case IMergeTreeDataPart::State::Active: + case MergeTreeDataPartState::Active: CurrentMetrics::sub(CurrentMetrics::PartsActive); CurrentMetrics::sub(CurrentMetrics::PartsCommitted); return; - case IMergeTreeDataPart::State::Outdated: + case MergeTreeDataPartState::Outdated: CurrentMetrics::sub(CurrentMetrics::PartsOutdated); return; - case IMergeTreeDataPart::State::Deleting: + case MergeTreeDataPartState::Deleting: CurrentMetrics::sub(CurrentMetrics::PartsDeleting); return; - case IMergeTreeDataPart::State::DeleteOnDestroy: + case MergeTreeDataPartState::DeleteOnDestroy: CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy); return; } @@ -313,7 +313,7 @@ IMergeTreeDataPart::IMergeTreeDataPart( , use_metadata_cache(storage.use_metadata_cache) { if (parent_part) - state = State::Active; + state = MergeTreeDataPartState::Active; incrementStateMetric(state); incrementTypeMetric(part_type); @@ -339,7 +339,7 @@ IMergeTreeDataPart::IMergeTreeDataPart( , use_metadata_cache(storage.use_metadata_cache) { if (parent_part) - state = State::Active; + state = MergeTreeDataPartState::Active; incrementStateMetric(state); incrementTypeMetric(part_type); @@ -381,14 +381,14 @@ std::optional IMergeTreeDataPart::getColumnPosition(const String & colum } -void IMergeTreeDataPart::setState(IMergeTreeDataPart::State new_state) const +void IMergeTreeDataPart::setState(MergeTreeDataPartState new_state) const { decrementStateMetric(state); state = new_state; incrementStateMetric(state); } -IMergeTreeDataPart::State IMergeTreeDataPart::getState() const +MergeTreeDataPartState IMergeTreeDataPart::getState() const { return state; } @@ -496,7 +496,7 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n void IMergeTreeDataPart::removeIfNeeded() { assert(assertHasValidVersionMetadata()); - if (!is_temp && state != State::DeleteOnDestroy) + if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy) return; try @@ -526,7 +526,7 @@ void IMergeTreeDataPart::removeIfNeeded() remove(); - if (state == State::DeleteOnDestroy) + if (state == MergeTreeDataPartState::DeleteOnDestroy) { LOG_TRACE(storage.log, "Removed part from old location {}", path); } @@ -539,8 +539,8 @@ void IMergeTreeDataPart::removeIfNeeded() /// Seems like it's especially important for remote disks, because removal may fail due to network issues. tryLogCurrentException(__PRETTY_FUNCTION__); assert(!is_temp); - assert(state != State::DeleteOnDestroy); - assert(state != State::Temporary); + assert(state != MergeTreeDataPartState::DeleteOnDestroy); + assert(state != MergeTreeDataPartState::Temporary); } } @@ -561,7 +561,7 @@ UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const return res; } -void IMergeTreeDataPart::assertState(const std::initializer_list & affordable_states) const +void IMergeTreeDataPart::assertState(const std::initializer_list & affordable_states) const { if (!checkState(affordable_states)) { @@ -1295,7 +1295,7 @@ catch (Exception & e) bool IMergeTreeDataPart::wasInvolvedInTransaction() const { - assert(!version.creation_tid.isEmpty() || (state == State::Temporary /* && std::uncaught_exceptions() */)); + assert(!version.creation_tid.isEmpty() || (state == MergeTreeDataPartState::Temporary /* && std::uncaught_exceptions() */)); bool created_by_transaction = !version.creation_tid.isPrehistoric(); bool removed_by_transaction = version.isRemovalTIDLocked() && version.removal_tid_lock != Tx::PrehistoricTID.getHash(); return created_by_transaction || removed_by_transaction; @@ -1319,7 +1319,7 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const if (part_is_probably_removed_from_disk) return true; - if (state == State::Temporary) + if (state == MergeTreeDataPartState::Temporary) return true; if (!data_part_storage->exists()) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 86ba34a0744..bdf42223d2f 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -41,8 +43,6 @@ class IMergeTreeDataPartWriter; class MarkCache; class UncompressedCache; class MergeTreeTransaction; -class IDataPartStorage; -using DataPartStoragePtr = std::shared_ptr; /// Description of the data part. class IMergeTreeDataPart : public std::enable_shared_from_this @@ -224,45 +224,22 @@ public: /// Flag for keep S3 data when zero-copy replication over S3 turned on. mutable bool force_keep_shared_data = false; - /** - * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. - * Part state should be modified under data_parts mutex. - * - * Possible state transitions: - * Temporary -> PreActive: we are trying to add a fetched, inserted or merged part to active set - * PreActive -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part) - * PreActive -> Active: we successfully added a part to active dataset - * PreActive -> Outdated: a part was replaced by a covering part or DROP PARTITION - * Outdated -> Deleting: a cleaner selected this part for deletion - * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion - * Active -> DeleteOnDestroy: if part was moved to another disk - */ - enum class State - { - Temporary, /// the part is generating now, it is not in data_parts list - PreActive, /// the part is in data_parts, but not used for SELECTs - Active, /// active data part, used by current and upcoming SELECTs - Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes - Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner - DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor - }; - using TTLInfo = MergeTreeDataPartTTLInfo; using TTLInfos = MergeTreeDataPartTTLInfos; mutable TTLInfos ttl_infos; /// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex - void setState(State new_state) const; - State getState() const; + void setState(MergeTreeDataPartState new_state) const; + MergeTreeDataPartState getState() const; - static constexpr std::string_view stateString(State state) { return magic_enum::enum_name(state); } + static constexpr std::string_view stateString(MergeTreeDataPartState state) { return magic_enum::enum_name(state); } constexpr std::string_view stateString() const { return stateString(state); } String getNameWithState() const { return fmt::format("{} (state {})", name, stateString()); } /// Returns true if state of part is one of affordable_states - bool checkState(const std::initializer_list & affordable_states) const + bool checkState(const std::initializer_list & affordable_states) const { for (auto affordable_state : affordable_states) { @@ -273,7 +250,7 @@ public: } /// Throws an exception if state of the part is not in affordable_states - void assertState(const std::initializer_list & affordable_states) const; + void assertState(const std::initializer_list & affordable_states) const; /// Primary key (correspond to primary.idx file). /// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple. @@ -593,13 +570,12 @@ private: /// for this column with default parameters. CompressionCodecPtr detectDefaultCompressionCodec() const; - mutable State state{State::Temporary}; + mutable MergeTreeDataPartState state{MergeTreeDataPartState::Temporary}; /// This ugly flag is needed for debug assertions only mutable bool part_is_probably_removed_from_disk = false; }; -using MergeTreeDataPartState = IMergeTreeDataPart::State; using MergeTreeDataPartPtr = std::shared_ptr; using MergeTreeMutableDataPartPtr = std::shared_ptr; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 07f73759014..88673dbb341 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2939,18 +2939,18 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const if (part->version.creation_csn != Tx::RolledBackCSN) MergeTreeTransaction::removeOldPart(shared_from_this(), part, txn); - if (part->getState() == IMergeTreeDataPart::State::Active) + if (part->getState() == MergeTreeDataPartState::Active) { removePartContributionToColumnAndSecondaryIndexSizes(part); removePartContributionToDataVolume(part); removed_active_part = true; } - if (part->getState() == IMergeTreeDataPart::State::Active || clear_without_timeout) + if (part->getState() == MergeTreeDataPartState::Active || clear_without_timeout) part->remove_time.store(remove_time, std::memory_order_relaxed); - if (part->getState() != IMergeTreeDataPart::State::Outdated) - modifyPartState(part, IMergeTreeDataPart::State::Outdated); + if (part->getState() != MergeTreeDataPartState::Outdated) + modifyPartState(part, MergeTreeDataPartState::Outdated); if (isInMemoryPart(part) && getSettings()->in_memory_parts_enable_wal) getWriteAheadLog()->dropPart(part->name); @@ -2970,9 +2970,9 @@ void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(con if (it_part == data_parts_by_info.end()) throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR); - assert(part->getState() == IMergeTreeDataPart::State::PreActive); + assert(part->getState() == MergeTreeDataPartState::PreActive); - modifyPartState(part, IMergeTreeDataPart::State::Temporary); + modifyPartState(part, MergeTreeDataPartState::Temporary); /// Erase immediately data_parts_indexes.erase(it_part); } @@ -6097,7 +6097,7 @@ void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) con broken_part_callback(part->name); } } - else if (data_part && data_part->getState() == IMergeTreeDataPart::State::Active) + else if (data_part && data_part->getState() == MergeTreeDataPartState::Active) broken_part_callback(data_part->name); else LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState()); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 024311f4494..cdfd5d4e053 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -142,7 +142,7 @@ public: /// After the DataPart is added to the working set, it cannot be changed. using DataPartPtr = std::shared_ptr; - using DataPartState = IMergeTreeDataPart::State; + using DataPartState = MergeTreeDataPartState; using DataPartStates = std::initializer_list; using DataPartStateVector = std::vector; diff --git a/src/Storages/MergeTree/MergeTreeDataPartState.h b/src/Storages/MergeTree/MergeTreeDataPartState.h new file mode 100644 index 00000000000..a52f7559375 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDataPartState.h @@ -0,0 +1,29 @@ +#pragma once + +namespace DB +{ + +/** + * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. + * Part state should be modified under data_parts mutex. + * + * Possible state transitions: + * Temporary -> PreActive: we are trying to add a fetched, inserted or merged part to active set + * PreActive -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part) + * PreActive -> Active: we successfully added a part to active dataset + * PreActive -> Outdated: a part was replaced by a covering part or DROP PARTITION + * Outdated -> Deleting: a cleaner selected this part for deletion + * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion + * Active -> DeleteOnDestroy: if part was moved to another disk + */ +enum class MergeTreeDataPartState +{ + Temporary, /// the part is generating now, it is not in data_parts list + PreActive, /// the part is in data_parts, but not used for SELECTs + Active, /// active data part, used by current and upcoming SELECTs + Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes + Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner + DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor +}; + +} diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b4fc6b34c9e..7a5b5fb7880 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3841,7 +3841,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora if (!to_detached) { - if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting})) + if (auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting})) { LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. @@ -4077,7 +4077,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( auto zookeeper = getZooKeeper(); const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); - if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting})) + if (auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting})) { LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 1b207d1d165..07f3cd0ccc5 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -98,7 +98,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) void StorageSystemParts::processNextStorage( ContextPtr context, MutableColumns & columns, std::vector & columns_mask, const StoragesInfo & info, bool has_state_column) { - using State = IMergeTreeDataPart::State; + using State = MergeTreeDataPartState; MergeTreeData::DataPartStateVector all_parts_state; MergeTreeData::DataPartsVector all_parts; diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 292d8087c10..87a5afe2439 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -117,7 +117,7 @@ void StorageSystemPartsColumns::processNextStorage( auto index_size_in_bytes = part->getIndexSizeInBytes(); auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes(); - using State = IMergeTreeDataPart::State; + using State = MergeTreeDataPartState; size_t column_position = 0; for (const auto & column : part->getColumns()) diff --git a/src/Storages/System/StorageSystemProjectionParts.cpp b/src/Storages/System/StorageSystemProjectionParts.cpp index 7314c1e5012..3934e7c9623 100644 --- a/src/Storages/System/StorageSystemProjectionParts.cpp +++ b/src/Storages/System/StorageSystemProjectionParts.cpp @@ -92,7 +92,7 @@ StorageSystemProjectionParts::StorageSystemProjectionParts(const StorageID & tab void StorageSystemProjectionParts::processNextStorage( ContextPtr, MutableColumns & columns, std::vector & columns_mask, const StoragesInfo & info, bool has_state_column) { - using State = IMergeTreeDataPart::State; + using State = MergeTreeDataPartState; MergeTreeData::DataPartStateVector all_parts_state; MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(all_parts_state, has_state_column); for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number) diff --git a/src/Storages/System/StorageSystemProjectionPartsColumns.cpp b/src/Storages/System/StorageSystemProjectionPartsColumns.cpp index 78a6df58761..0847010faaa 100644 --- a/src/Storages/System/StorageSystemProjectionPartsColumns.cpp +++ b/src/Storages/System/StorageSystemProjectionPartsColumns.cpp @@ -122,7 +122,7 @@ void StorageSystemProjectionPartsColumns::processNextStorage( auto index_size_in_bytes = part->getIndexSizeInBytes(); auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes(); - using State = IMergeTreeDataPart::State; + using State = MergeTreeDataPartState; size_t column_position = 0; auto & columns_info = projection_columns_info[part->name];