Simplify more

This commit is contained in:
alesapin 2022-06-24 17:19:59 +02:00
parent 612c4571d5
commit 011d58d7a0
7 changed files with 108 additions and 89 deletions

View File

@ -96,7 +96,6 @@ namespace ProfileEvents
extern const Event RejectedInserts;
extern const Event DelayedInserts;
extern const Event DelayedInsertsMilliseconds;
extern const Event DuplicatedInsertedBlocks;
extern const Event InsertedWideParts;
extern const Event InsertedCompactParts;
extern const Event InsertedInMemoryParts;
@ -2787,42 +2786,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
SimpleIncrement * increment,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
DataPartsLock & lock)
{
DataPartsVector covered_parts;
{
auto lock = lockParts();
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
if (increment)
{
part->info.min_block = part->info.max_block = increment->get();
part->info.mutation = 0;
part->name = part->getNewName(part->info);
}
/// Deduplication log used only from non-replicated MergeTree. Replicated
/// tables have their own mechanism. We try to deduplicate at such deep
/// level, because only here we know real part name which is required for
/// deduplication.
if (deduplication_log)
{
const String block_id = part->getZeroLevelPartBlockID(deduplication_token);
auto res = deduplication_log->addPart(block_id, part->info);
if (!res.second)
{
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
LOG_INFO(log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
return false;
}
}
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
return false;
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
return false;
}
if (!covered_parts.empty())
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
@ -2904,34 +2874,23 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
return true;
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
{
DataPartsVector covered_parts;
renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts);
return covered_parts;
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction)
{
auto part_lock = lockParts();
DataPartsVector covered_parts;
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts);
return covered_parts;
}
void MergeTreeData::renameTempPartsAndReplace(
MutableDataPartsVector & parts,
Transaction & out_transaction,
DataPartsLock & lock,
SimpleIncrement * increment)
{
for (auto & part : parts)
{
if (increment)
{
part->info.min_block = part->info.max_block = increment->get();
part->info.mutation = 0;
part->name = part->getNewName(part->info);
}
renameTempPartAndReplaceImpl(part, out_transaction, lock, nullptr);
}
return renameTempPartAndReplaceUnlocked(part, out_transaction, part_lock);
}
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)

View File

@ -557,23 +557,20 @@ public:
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & transaction,
SimpleIncrement * increment = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());
DataPartsLock & lock);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock);
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction);
void renameTempPartsAndReplace(
MutableDataPartsVector & parts,
Transaction & out_transaction,
DataPartsLock & lock,
SimpleIncrement * increment = nullptr);
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which aren't already present in table.
@ -1245,11 +1242,14 @@ protected:
private:
/// Checking that candidate part doesn't break invariants: correct partition and doesn't exist already
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
/// Preparing itself to be commited in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommited state and to transasction
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);
/// Low-level version of previous one, doesn't lock mutex
/// Low-level method for preparing parts for commit.
/// FIXME Merge MergeTreeTransaction and Transaction
bool renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,

View File

@ -3,6 +3,10 @@
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
namespace ProfileEvents
{
extern const Event DuplicatedInsertedBlocks;
}
namespace DB
{
@ -133,11 +137,42 @@ void MergeTreeSink::finishDelayedChunk()
auto & part = partition.temp_part.part;
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, transaction, &storage.increment, storage.getDeduplicationLog(), partition.block_dedup_token))
bool added = false;
{
auto lock = storage.lockParts();
storage.fillNewPartName(part, lock);
auto * deduplication_log = storage.getDeduplicationLog();
if (deduplication_log)
{
const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token);
auto res = deduplication_log->addPart(block_id, part->info);
if (!res.second)
{
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
LOG_INFO(storage.log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
}
else
{
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
added = storage.renameTempPartAndAdd(part, transaction, lock);
transaction.commit(&lock);
}
}
else
{
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
added = storage.renameTempPartAndAdd(part, transaction, lock);
transaction.commit(&lock);
}
}
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (added)
{
transaction.commit();
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
storage.incrementInsertedPartsProfileEvent(part->getType());

View File

@ -476,7 +476,8 @@ void ReplicatedMergeTreeSink::commitPart(
try
{
renamed = storage.renameTempPartAndAdd(part, transaction);
auto lock = storage.lockParts();
renamed = storage.renameTempPartAndAdd(part, transaction, lock);
}
catch (const Exception & e)
{

View File

@ -1542,9 +1542,13 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
loaded_parts[i]->storeVersionMetadata();
String old_name = renamed_parts.old_and_new_names[i].old_name;
MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get());
renameTempPartAndAdd(loaded_parts[i], transaction, &increment);
transaction.commit();
{
auto lock = lockParts();
MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get());
fillNewPartName(loaded_parts[i], lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock);
transaction.commit(&lock);
}
renamed_parts.old_and_new_names[i].old_name.clear();
@ -1616,8 +1620,16 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
Transaction transaction(*this, local_context->getCurrentTransaction().get());
auto data_parts_lock = lockParts();
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
for (auto part : dst_parts)
{
fillNewPartName(part, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
}
/// Populate transaction
renameTempPartsAndReplace(dst_parts, transaction, data_parts_lock, &increment);
transaction.commit(&data_parts_lock);
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
@ -1690,13 +1702,15 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
std::mutex mutex;
DataPartsLock lock(mutex);
for (auto & part : dst_parts)
{
dest_table_storage->fillNewPartName(part, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
}
dest_table_storage->renameTempPartsAndReplace(dst_parts, transaction, lock, &dest_table_storage->increment);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, lock);
transaction.commit(&lock);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
clearOldPartsFromFilesystem();
@ -1787,9 +1801,11 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
for (auto part : parts)
{
auto lock = lockParts();
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndAdd(part, transaction, &increment);
transaction.commit();
fillNewPartName(part, lock);
renameTempPartAndAdd(part, transaction, lock);
transaction.commit(&lock);
}
}
@ -1815,4 +1831,11 @@ std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
}
void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &)
{
part->info.min_block = part->info.max_block = increment.get();
part->info.mutation = 0;
part->name = part->getNewName(part->info);
}
}

View File

@ -251,6 +251,8 @@ private:
/// return any ids.
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr) const;
void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock);
void startBackgroundMovesIfNeeded() override;
/// Attaches restored parts to the storage.

View File

@ -6602,7 +6602,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
Transaction transaction(*this, NO_TRANSACTION_RAW);
{
auto data_parts_lock = lockParts();
renameTempPartsAndReplace(dst_parts, transaction, data_parts_lock);
for (auto & part : dst_parts)
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
}
for (size_t i = 0; i < dst_parts.size(); ++i)
@ -6835,10 +6836,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto src_data_parts_lock = lockParts();
auto dest_data_parts_lock = dest_table_storage->lockParts();
std::mutex mutex;
DataPartsLock lock(mutex);
renameTempPartsAndReplace(dst_parts, transaction, lock);
for (auto & part : dst_parts)
renameTempPartAndReplaceUnlocked(part, transaction, src_data_parts_lock);
for (size_t i = 0; i < dst_parts.size(); ++i)
dest_table_storage->lockSharedData(*dst_parts[i], false, hardlinked_files_for_parts[i]);
@ -6849,8 +6848,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, lock);
transaction.commit(&lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());