Add state for MergeTree parts. [#CLICKHOUSE-3178]

And Removed obsolete code.
This commit is contained in:
Vitaliy Lyudvichenko 2017-09-05 22:03:51 +03:00
parent 996b18c816
commit 5787e8b257
3 changed files with 69 additions and 13 deletions

View File

@ -1294,6 +1294,12 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
if (out_transaction && out_transaction->data)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
if (part->state != MergeTreeDataPart::State::Temporary)
throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR);
/// ReplicatedMergeTree engines (that use out_transaction) don't commit part immediately
auto res_state = (out_transaction) ? MergeTreeDataPart::State::Precommitted : MergeTreeDataPart::State::Committed;
DataPartsVector replaced;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
@ -1334,6 +1340,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
clearOldPartsAndRemoveFromZK();
/// Rename the part.
/// TODO: What if it is obsolete?
part->renameTo(new_name);
part->is_temp = false;
part->name = new_name;
@ -1356,6 +1363,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
}
replaced.push_back(*it);
(*it)->remove_time = time(nullptr);
(*it)->state = MergeTreeDataPart::State::Outdated;
removePartContributionToColumnSizes(*it);
data_parts.erase(it++); /// Yes, ++, not --.
}
@ -1371,6 +1379,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
}
replaced.push_back(*it);
(*it)->remove_time = time(nullptr);
(*it)->state = MergeTreeDataPart::State::Outdated;
removePartContributionToColumnSizes(*it);
data_parts.erase(it++);
}
@ -1378,10 +1387,14 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
if (obsolete)
{
LOG_WARNING(log, "Obsolete part " << part->name << " added");
/// TODO: Why we can't delete it immediately?
/// TODO: Maybe Deleting or Temporary state is more appropriate.
part->state = MergeTreeDataPart::State::Outdated;
part->remove_time = time(nullptr);
}
else
{
part->state = res_state;
data_parts.insert(part);
addPartContributionToColumnSizes(part);
}
@ -1396,7 +1409,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
{
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = replaced;
out_transaction->parts_to_remove_on_rollback = DataPartsVector(1, part);
out_transaction->parts_to_remove_on_rollback = {part};
}
return replaced;
@ -1404,11 +1417,24 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
{
for (auto & part : remove)
{
if (part->state != MergeTreeDataPart::State::Precommitted && part->state != MergeTreeDataPart::State::Committed)
throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR);
}
for (auto & part : add)
{
if (part->state != MergeTreeDataPart::State::Temporary)
throw Exception("Unexpected state of part " + part->name, ErrorCodes::LOGICAL_ERROR);
}
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (const DataPartPtr & part : remove)
{
part->remove_time = clear_without_timeout ? 0 : time(nullptr);
part->state = MergeTreeDataPart::State::Outdated;
if (data_parts.erase(part))
removePartContributionToColumnSizes(part);
@ -1417,10 +1443,15 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
for (const DataPartPtr & part : add)
{
if (data_parts.insert(part).second)
{
part->state = MergeTreeDataPart::State::Precommitted;
addPartContributionToColumnSizes(part);
}
/// TODO: Why there are no assertion in the else branch?
}
}
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
{
LOG_INFO(log, "Renaming " << part->relative_path << " to " << prefix << part->name << " and detaching it.");
@ -1488,21 +1519,24 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String &
}
}
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
{
renameAndDetachPart(part, "", false, false);
}
MergeTreeData::DataParts MergeTreeData::getDataParts() const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
return data_parts;
MergeTreeData::DataParts res;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.begin()), MergeTreeDataPart::isCommitedPart);
}
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
return DataPartsVector(std::begin(data_parts), std::end(data_parts));
MergeTreeData::DataPartsVector res;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.begin()), MergeTreeDataPart::isCommitedPart);
}
return res;
}
size_t MergeTreeData::getTotalActiveSizeInBytes() const

View File

@ -348,9 +348,6 @@ public:
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
void renameAndDetachPart(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true);
/// Removes the part from the list of parts (including all_data_parts), but doesn't move the directory.
void detachPartInPlace(const DataPartPtr & part);
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts
/// but not from the disk.
DataPartsVector grabOldParts();

View File

@ -139,6 +139,31 @@ struct MergeTreeDataPart
/// If true, the destructor will delete the directory with the part.
bool is_temp = false;
/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not to add a part to active set and doing a rollback (for example it is duplicated part)
* Precommitted -> Commited: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
*/
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
Precommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting /// not active data part with identity refcounter, it is deleting right now by a cleaner
};
State state{State::Temporary};
bool isCommited() { return state == State::Committed; }
static bool isCommitedPart(const std::shared_ptr<MergeTreeDataPart> & part) { return part->isCommited(); }
/// For resharding.
size_t shard_no = 0;