Less strange lambdas

This commit is contained in:
alesapin 2020-10-23 11:54:00 +03:00
parent 6df68d6c80
commit 39e47c5338
8 changed files with 106 additions and 86 deletions

View File

@ -3630,7 +3630,7 @@ std::optional<JobAndPool> MergeTreeData::getDataMovingJob()
if (moving_tagger->parts_to_move.empty())
return {};
return JobAndPool{[this, moving_tagger{std::move(moving_tagger)}] () mutable
return JobAndPool{[this, moving_tagger] () mutable
{
moveParts(moving_tagger);
}, PoolType::MOVE};

View File

@ -16,7 +16,7 @@ namespace DB
struct MergeTreeMoveEntry
{
std::shared_ptr<const IMergeTreeDataPart> part;
std::shared_ptr<IReservation> reserved_space;
ReservationPtr reserved_space;
MergeTreeMoveEntry(const std::shared_ptr<const IMergeTreeDataPart> & part_, ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))

View File

@ -1259,7 +1259,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
}
ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data)
ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data)
{
LogEntryPtr entry;
@ -1286,7 +1286,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
}
if (entry)
return { entry, std::shared_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } };
return std::make_shared<SelectedEntry>(entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) });
else
return {};
}

View File

@ -259,6 +259,8 @@ private:
~CurrentlyExecuting();
};
using CurrentlyExecutingPtr = std::unique_ptr<CurrentlyExecuting>;
public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeQueue();
@ -319,8 +321,19 @@ public:
/** Select the next action to process.
* merger_mutator is used only to check if the merges are not suspended.
*/
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::shared_ptr<CurrentlyExecuting>>;
SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data);
struct SelectedEntry
{
ReplicatedMergeTreeQueue::LogEntryPtr log_entry;
CurrentlyExecutingPtr currently_executing_holder;
SelectedEntry(const ReplicatedMergeTreeQueue::LogEntryPtr & log_entry_, CurrentlyExecutingPtr && currently_executing_holder_)
: log_entry(log_entry_)
, currently_executing_holder(std::move(currently_executing_holder_))
{}
};
using SelectedEntryPtr = std::shared_ptr<SelectedEntry>;
SelectedEntryPtr selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data);
/** Execute `func` function to handle the action.
* In this case, at runtime, mark the queue element as running

View File

@ -309,15 +309,7 @@ void StorageMergeTree::alter(
/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem.
struct CurrentlyMergingPartsTagger
{
FutureMergedMutatedPart future_part;
ReservationPtr reserved_space;
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(
StorageMergeTree::CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger(
FutureMergedMutatedPart & future_part_,
size_t total_size,
StorageMergeTree & storage_,
@ -360,7 +352,7 @@ public:
storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end());
}
~CurrentlyMergingPartsTagger()
StorageMergeTree::CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger()
{
std::lock_guard lock(storage.currently_processing_in_background_mutex);
@ -373,8 +365,6 @@ public:
storage.currently_processing_in_background_condition.notify_all();
}
};
Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String & mutation_file_name)
{
@ -643,7 +633,7 @@ void StorageMergeTree::loadMutations()
increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first);
}
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * out_disable_reason, TableLockHolder & /* table_lock_holder */)
{
std::unique_lock lock(currently_processing_in_background_mutex);
@ -733,8 +723,8 @@ std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::sele
return {};
}
merging_tagger = std::make_shared<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), {}};
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), MutationCommands{});
}
bool StorageMergeTree::merge(
@ -799,7 +789,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
return currently_merging_mutating_parts.count(part);
}
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */)
std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */)
{
std::lock_guard lock(currently_processing_in_background_mutex);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
@ -873,8 +863,8 @@ std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::sele
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger = std::make_shared<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return MergeMutateSelectedEntry{future_part, std::move(tagger), commands};
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands);
}
return {};
}
@ -930,7 +920,7 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
return {};
auto metadata_snapshot = getInMemoryMetadataPtr();
std::optional<MergeMutateSelectedEntry> merge_entry, mutate_entry;
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock);
@ -939,7 +929,7 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
if (merge_entry || mutate_entry)
{
return JobAndPool{[this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}, share_lock] () mutable
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, *merge_entry, share_lock);

View File

@ -21,8 +21,6 @@
namespace DB
{
struct CurrentlyMergingPartsTagger;
/** See the description of the data structure in MergeTreeData.
*/
class StorageMergeTree final : public ext::shared_ptr_helper<StorageMergeTree>, public MergeTreeData
@ -140,21 +138,42 @@ private:
/// Wait until mutation with version will finish mutation for all parts
void waitForMutation(Int64 version, const String & file_name);
friend struct CurrentlyMergingPartsTagger;
struct CurrentlyMergingPartsTagger
{
FutureMergedMutatedPart future_part;
ReservationPtr reserved_space;
using CurrentlyMergingPartsTaggerPtr = std::shared_ptr<CurrentlyMergingPartsTagger>;
StorageMergeTree & storage;
CurrentlyMergingPartsTagger(
FutureMergedMutatedPart & future_part_,
size_t total_size,
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot,
bool is_mutation);
~CurrentlyMergingPartsTagger();
};
using CurrentlyMergingPartsTaggerPtr = std::unique_ptr<CurrentlyMergingPartsTagger>;
friend struct CurrentlyMergingPartsTagger;
struct MergeMutateSelectedEntry
{
FutureMergedMutatedPart future_part;
CurrentlyMergingPartsTaggerPtr tagger;
MutationCommands commands;
MergeMutateSelectedEntry(const FutureMergedMutatedPart & future_part_, CurrentlyMergingPartsTaggerPtr && tagger_, const MutationCommands & commands_)
: future_part(future_part_)
, tagger(std::move(tagger_))
, commands(commands_)
{}
};
std::optional<MergeMutateSelectedEntry> selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason, TableLockHolder & table_lock_holder);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason, TableLockHolder & table_lock_holder);
bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);
std::optional<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder);
bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);
Int64 getCurrentMutationVersion(

View File

@ -2540,10 +2540,10 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
}
}
ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueEntry()
ReplicatedMergeTreeQueue::SelectedEntryPtr StorageReplicatedMergeTree::selectQueueEntry()
{
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected;
ReplicatedMergeTreeQueue::SelectedEntryPtr selected;
try
{
@ -2557,10 +2557,10 @@ ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueE
return selected;
}
bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & selected_entry)
bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry)
{
LogEntryPtr & entry = selected_entry.first;
LogEntryPtr & entry = selected_entry->log_entry;
return queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
{
try
@ -2609,14 +2609,12 @@ std::optional<JobAndPool> StorageReplicatedMergeTree::getDataProcessingJob()
return {};
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry();
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();
LogEntryPtr & entry = selected_entry.first;
if (!entry)
if (!selected_entry)
return {};
return JobAndPool{[this, selected_entry{std::move(selected_entry)}] () mutable
return JobAndPool{[this, selected_entry] () mutable
{
processQueueEntry(selected_entry);
}, PoolType::MERGE_MUTATE};

View File

@ -418,9 +418,9 @@ private:
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
ReplicatedMergeTreeQueue::SelectedEntry selectQueueEntry();
ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry);
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)