From 5787e8b2574a9c817b8d00b5213f1b43a4b591f4 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Tue, 5 Sep 2017 22:03:51 +0300 Subject: [PATCH] Add state for MergeTree parts. [#CLICKHOUSE-3178] And Removed obsolete code. --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 54 +++++++++++++++---- dbms/src/Storages/MergeTree/MergeTreeData.h | 3 -- .../Storages/MergeTree/MergeTreeDataPart.h | 25 +++++++++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 44ddcc93860..75e054fc8c5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1294,6 +1294,12 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( if (out_transaction && out_transaction->data) throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR); + if (part->state != MergeTreeDataPart::State::Temporary) + throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR); + + /// ReplicatedMergeTree engines (that use out_transaction) don't commit part immediately + auto res_state = (out_transaction) ? MergeTreeDataPart::State::Precommitted : MergeTreeDataPart::State::Committed; + DataPartsVector replaced; { std::lock_guard lock(data_parts_mutex); @@ -1334,6 +1340,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( clearOldPartsAndRemoveFromZK(); /// Rename the part. + /// TODO: What if it is obsolete? part->renameTo(new_name); part->is_temp = false; part->name = new_name; @@ -1356,6 +1363,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( } replaced.push_back(*it); (*it)->remove_time = time(nullptr); + (*it)->state = MergeTreeDataPart::State::Outdated; removePartContributionToColumnSizes(*it); data_parts.erase(it++); /// Yes, ++, not --. } @@ -1371,6 +1379,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( } replaced.push_back(*it); (*it)->remove_time = time(nullptr); + (*it)->state = MergeTreeDataPart::State::Outdated; removePartContributionToColumnSizes(*it); data_parts.erase(it++); } @@ -1378,10 +1387,14 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( if (obsolete) { LOG_WARNING(log, "Obsolete part " << part->name << " added"); + /// TODO: Why we can't delete it immediately? + /// TODO: Maybe Deleting or Temporary state is more appropriate. + part->state = MergeTreeDataPart::State::Outdated; part->remove_time = time(nullptr); } else { + part->state = res_state; data_parts.insert(part); addPartContributionToColumnSizes(part); } @@ -1396,7 +1409,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( { out_transaction->data = this; out_transaction->parts_to_add_on_rollback = replaced; - out_transaction->parts_to_remove_on_rollback = DataPartsVector(1, part); + out_transaction->parts_to_remove_on_rollback = {part}; } return replaced; @@ -1404,11 +1417,24 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout) { + for (auto & part : remove) + { + if (part->state != MergeTreeDataPart::State::Precommitted && part->state != MergeTreeDataPart::State::Committed) + throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR); + } + + for (auto & part : add) + { + if (part->state != MergeTreeDataPart::State::Temporary) + throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR); + } + std::lock_guard lock(data_parts_mutex); for (const DataPartPtr & part : remove) { part->remove_time = clear_without_timeout ? 0 : time(nullptr); + part->state = MergeTreeDataPart::State::Outdated; if (data_parts.erase(part)) removePartContributionToColumnSizes(part); @@ -1417,10 +1443,15 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts for (const DataPartPtr & part : add) { if (data_parts.insert(part).second) + { + part->state = MergeTreeDataPart::State::Precommitted; addPartContributionToColumnSizes(part); + } + /// TODO: Why there are no assertion in the else branch? } } + void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached) { LOG_INFO(log, "Renaming " << part->relative_path << " to " << prefix << part->name << " and detaching it."); @@ -1488,21 +1519,24 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & } } -void MergeTreeData::detachPartInPlace(const DataPartPtr & part) -{ - renameAndDetachPart(part, "", false, false); -} - MergeTreeData::DataParts MergeTreeData::getDataParts() const { - std::lock_guard lock(data_parts_mutex); - return data_parts; + MergeTreeData::DataParts res; + { + std::lock_guard lock(data_parts_mutex); + std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.begin()), MergeTreeDataPart::isCommitedPart); + } + return res; } MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const { - std::lock_guard lock(data_parts_mutex); - return DataPartsVector(std::begin(data_parts), std::end(data_parts)); + MergeTreeData::DataPartsVector res; + { + std::lock_guard lock(data_parts_mutex); + std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.begin()), MergeTreeDataPart::isCommitedPart); + } + return res; } size_t MergeTreeData::getTotalActiveSizeInBytes() const diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 4c2697d09d1..1e82a3e977f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -348,9 +348,6 @@ public: /// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part. void renameAndDetachPart(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true); - /// Removes the part from the list of parts (including all_data_parts), but doesn't move the directory. - void detachPartInPlace(const DataPartPtr & part); - /// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts /// but not from the disk. DataPartsVector grabOldParts(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 7633cf42ab4..882196ecc1f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -139,6 +139,31 @@ struct MergeTreeDataPart /// If true, the destructor will delete the directory with the part. bool is_temp = false; + /** + * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. + * Part state should be modified under data_parts mutex. + * + * Possible state transitions: + * Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set + * Precommitted -> Outdated: we could not to add a part to active set and doing a rollback (for example it is duplicated part) + * Precommitted -> Commited: we successfully committed a part to active dataset + * Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION + * Outdated -> Deleting: a cleaner selected this part for deletion + */ + enum class State + { + Temporary, /// the part is generating now, it is not in data_parts list + Precommitted, /// the part is in data_parts, but not used for SELECTs + Committed, /// active data part, used by current and upcoming SELECTs + Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes + Deleting /// not active data part with identity refcounter, it is deleting right now by a cleaner + }; + + State state{State::Temporary}; + + bool isCommited() { return state == State::Committed; } + static bool isCommitedPart(const std::shared_ptr & part) { return part->isCommited(); } + /// For resharding. size_t shard_no = 0;