fix build

This commit is contained in:
Alexander Tokmakov 2022-08-12 13:03:57 +02:00
parent d24b9874bc
commit e691888267
11 changed files with 77 additions and 72 deletions

View File

@ -3,7 +3,7 @@
#include <base/types.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <optional>
namespace DB

View File

@ -207,55 +207,55 @@ void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, St
}
static void incrementStateMetric(IMergeTreeDataPart::State state)
static void incrementStateMetric(MergeTreeDataPartState state)
{
switch (state)
{
case IMergeTreeDataPart::State::Temporary:
case MergeTreeDataPartState::Temporary:
CurrentMetrics::add(CurrentMetrics::PartsTemporary);
return;
case IMergeTreeDataPart::State::PreActive:
case MergeTreeDataPartState::PreActive:
CurrentMetrics::add(CurrentMetrics::PartsPreActive);
CurrentMetrics::add(CurrentMetrics::PartsPreCommitted);
return;
case IMergeTreeDataPart::State::Active:
case MergeTreeDataPartState::Active:
CurrentMetrics::add(CurrentMetrics::PartsActive);
CurrentMetrics::add(CurrentMetrics::PartsCommitted);
return;
case IMergeTreeDataPart::State::Outdated:
case MergeTreeDataPartState::Outdated:
CurrentMetrics::add(CurrentMetrics::PartsOutdated);
return;
case IMergeTreeDataPart::State::Deleting:
case MergeTreeDataPartState::Deleting:
CurrentMetrics::add(CurrentMetrics::PartsDeleting);
return;
case IMergeTreeDataPart::State::DeleteOnDestroy:
case MergeTreeDataPartState::DeleteOnDestroy:
CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
}
static void decrementStateMetric(IMergeTreeDataPart::State state)
static void decrementStateMetric(MergeTreeDataPartState state)
{
switch (state)
{
case IMergeTreeDataPart::State::Temporary:
case MergeTreeDataPartState::Temporary:
CurrentMetrics::sub(CurrentMetrics::PartsTemporary);
return;
case IMergeTreeDataPart::State::PreActive:
case MergeTreeDataPartState::PreActive:
CurrentMetrics::sub(CurrentMetrics::PartsPreActive);
CurrentMetrics::sub(CurrentMetrics::PartsPreCommitted);
return;
case IMergeTreeDataPart::State::Active:
case MergeTreeDataPartState::Active:
CurrentMetrics::sub(CurrentMetrics::PartsActive);
CurrentMetrics::sub(CurrentMetrics::PartsCommitted);
return;
case IMergeTreeDataPart::State::Outdated:
case MergeTreeDataPartState::Outdated:
CurrentMetrics::sub(CurrentMetrics::PartsOutdated);
return;
case IMergeTreeDataPart::State::Deleting:
case MergeTreeDataPartState::Deleting:
CurrentMetrics::sub(CurrentMetrics::PartsDeleting);
return;
case IMergeTreeDataPart::State::DeleteOnDestroy:
case MergeTreeDataPartState::DeleteOnDestroy:
CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
@ -313,7 +313,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);
incrementTypeMetric(part_type);
@ -339,7 +339,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);
incrementTypeMetric(part_type);
@ -381,14 +381,14 @@ std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & colum
}
void IMergeTreeDataPart::setState(IMergeTreeDataPart::State new_state) const
void IMergeTreeDataPart::setState(MergeTreeDataPartState new_state) const
{
decrementStateMetric(state);
state = new_state;
incrementStateMetric(state);
}
IMergeTreeDataPart::State IMergeTreeDataPart::getState() const
MergeTreeDataPartState IMergeTreeDataPart::getState() const
{
return state;
}
@ -496,7 +496,7 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n
void IMergeTreeDataPart::removeIfNeeded()
{
assert(assertHasValidVersionMetadata());
if (!is_temp && state != State::DeleteOnDestroy)
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
try
@ -526,7 +526,7 @@ void IMergeTreeDataPart::removeIfNeeded()
remove();
if (state == State::DeleteOnDestroy)
if (state == MergeTreeDataPartState::DeleteOnDestroy)
{
LOG_TRACE(storage.log, "Removed part from old location {}", path);
}
@ -539,8 +539,8 @@ void IMergeTreeDataPart::removeIfNeeded()
/// Seems like it's especially important for remote disks, because removal may fail due to network issues.
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(!is_temp);
assert(state != State::DeleteOnDestroy);
assert(state != State::Temporary);
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
assert(state != MergeTreeDataPartState::Temporary);
}
}
@ -561,7 +561,7 @@ UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const
return res;
}
void IMergeTreeDataPart::assertState(const std::initializer_list<IMergeTreeDataPart::State> & affordable_states) const
void IMergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const
{
if (!checkState(affordable_states))
{
@ -1295,7 +1295,7 @@ catch (Exception & e)
bool IMergeTreeDataPart::wasInvolvedInTransaction() const
{
assert(!version.creation_tid.isEmpty() || (state == State::Temporary /* && std::uncaught_exceptions() */));
assert(!version.creation_tid.isEmpty() || (state == MergeTreeDataPartState::Temporary /* && std::uncaught_exceptions() */));
bool created_by_transaction = !version.creation_tid.isPrehistoric();
bool removed_by_transaction = version.isRemovalTIDLocked() && version.removal_tid_lock != Tx::PrehistoricTID.getHash();
return created_by_transaction || removed_by_transaction;
@ -1319,7 +1319,7 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
if (part_is_probably_removed_from_disk)
return true;
if (state == State::Temporary)
if (state == MergeTreeDataPartState::Temporary)
return true;
if (!data_part_storage->exists())

View File

@ -5,6 +5,8 @@
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
@ -41,8 +43,6 @@ class IMergeTreeDataPartWriter;
class MarkCache;
class UncompressedCache;
class MergeTreeTransaction;
class IDataPartStorage;
using DataPartStoragePtr = std::shared_ptr<IDataPartStorage>;
/// Description of the data part.
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>
@ -224,45 +224,22 @@ public:
/// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool force_keep_shared_data = false;
/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> PreActive: we are trying to add a fetched, inserted or merged part to active set
* PreActive -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* PreActive -> Active: we successfully added a part to active dataset
* PreActive -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Active -> DeleteOnDestroy: if part was moved to another disk
*/
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreActive, /// the part is in data_parts, but not used for SELECTs
Active, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
using TTLInfo = MergeTreeDataPartTTLInfo;
using TTLInfos = MergeTreeDataPartTTLInfos;
mutable TTLInfos ttl_infos;
/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
void setState(State new_state) const;
State getState() const;
void setState(MergeTreeDataPartState new_state) const;
MergeTreeDataPartState getState() const;
static constexpr std::string_view stateString(State state) { return magic_enum::enum_name(state); }
static constexpr std::string_view stateString(MergeTreeDataPartState state) { return magic_enum::enum_name(state); }
constexpr std::string_view stateString() const { return stateString(state); }
String getNameWithState() const { return fmt::format("{} (state {})", name, stateString()); }
/// Returns true if state of part is one of affordable_states
bool checkState(const std::initializer_list<State> & affordable_states) const
bool checkState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const
{
for (auto affordable_state : affordable_states)
{
@ -273,7 +250,7 @@ public:
}
/// Throws an exception if state of the part is not in affordable_states
void assertState(const std::initializer_list<State> & affordable_states) const;
void assertState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const;
/// Primary key (correspond to primary.idx file).
/// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
@ -593,13 +570,12 @@ private:
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;
mutable State state{State::Temporary};
mutable MergeTreeDataPartState state{MergeTreeDataPartState::Temporary};
/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
};
using MergeTreeDataPartState = IMergeTreeDataPart::State;
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;

View File

@ -2939,18 +2939,18 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
if (part->version.creation_csn != Tx::RolledBackCSN)
MergeTreeTransaction::removeOldPart(shared_from_this(), part, txn);
if (part->getState() == IMergeTreeDataPart::State::Active)
if (part->getState() == MergeTreeDataPartState::Active)
{
removePartContributionToColumnAndSecondaryIndexSizes(part);
removePartContributionToDataVolume(part);
removed_active_part = true;
}
if (part->getState() == IMergeTreeDataPart::State::Active || clear_without_timeout)
if (part->getState() == MergeTreeDataPartState::Active || clear_without_timeout)
part->remove_time.store(remove_time, std::memory_order_relaxed);
if (part->getState() != IMergeTreeDataPart::State::Outdated)
modifyPartState(part, IMergeTreeDataPart::State::Outdated);
if (part->getState() != MergeTreeDataPartState::Outdated)
modifyPartState(part, MergeTreeDataPartState::Outdated);
if (isInMemoryPart(part) && getSettings()->in_memory_parts_enable_wal)
getWriteAheadLog()->dropPart(part->name);
@ -2970,9 +2970,9 @@ void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(con
if (it_part == data_parts_by_info.end())
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
assert(part->getState() == IMergeTreeDataPart::State::PreActive);
assert(part->getState() == MergeTreeDataPartState::PreActive);
modifyPartState(part, IMergeTreeDataPart::State::Temporary);
modifyPartState(part, MergeTreeDataPartState::Temporary);
/// Erase immediately
data_parts_indexes.erase(it_part);
}
@ -6097,7 +6097,7 @@ void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) con
broken_part_callback(part->name);
}
}
else if (data_part && data_part->getState() == IMergeTreeDataPart::State::Active)
else if (data_part && data_part->getState() == MergeTreeDataPartState::Active)
broken_part_callback(data_part->name);
else
LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState());

View File

@ -142,7 +142,7 @@ public:
/// After the DataPart is added to the working set, it cannot be changed.
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartState = IMergeTreeDataPart::State;
using DataPartState = MergeTreeDataPartState;
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;

View File

@ -0,0 +1,29 @@
#pragma once
namespace DB
{
/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> PreActive: we are trying to add a fetched, inserted or merged part to active set
* PreActive -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* PreActive -> Active: we successfully added a part to active dataset
* PreActive -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Active -> DeleteOnDestroy: if part was moved to another disk
*/
enum class MergeTreeDataPartState
{
Temporary, /// the part is generating now, it is not in data_parts list
PreActive, /// the part is in data_parts, but not used for SELECTs
Active, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
}

View File

@ -3841,7 +3841,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
if (!to_detached)
{
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
if (auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting}))
{
LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name);
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
@ -4077,7 +4077,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
auto zookeeper = getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
if (auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting}))
{
LOG_DEBUG(log, "Part {} should be deleted after previous attempt before fetch", part->name);
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.

View File

@ -98,7 +98,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
void StorageSystemParts::processNextStorage(
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = IMergeTreeDataPart::State;
using State = MergeTreeDataPartState;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;

View File

@ -117,7 +117,7 @@ void StorageSystemPartsColumns::processNextStorage(
auto index_size_in_bytes = part->getIndexSizeInBytes();
auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes();
using State = IMergeTreeDataPart::State;
using State = MergeTreeDataPartState;
size_t column_position = 0;
for (const auto & column : part->getColumns())

View File

@ -92,7 +92,7 @@ StorageSystemProjectionParts::StorageSystemProjectionParts(const StorageID & tab
void StorageSystemProjectionParts::processNextStorage(
ContextPtr, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = IMergeTreeDataPart::State;
using State = MergeTreeDataPartState;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(all_parts_state, has_state_column);
for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number)

View File

@ -122,7 +122,7 @@ void StorageSystemProjectionPartsColumns::processNextStorage(
auto index_size_in_bytes = part->getIndexSizeInBytes();
auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes();
using State = IMergeTreeDataPart::State;
using State = MergeTreeDataPartState;
size_t column_position = 0;
auto & columns_info = projection_columns_info[part->name];