Some improvements

This commit is contained in:
alesapin 2019-08-16 18:57:19 +03:00
parent 6333f4b074
commit defc1e30ca
10 changed files with 129 additions and 88 deletions

View File

@ -2389,28 +2389,50 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return nullptr;
}
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part)
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock & /* lock */)
{
auto data_parts_lock = lockParts();
for (const auto & active_part : getDataPartsStateRange(DataPartState::Committed))
for (const auto & original_active_part : getDataPartsStateRange(DataPartState::Committed))
{
if (part->name == active_part->name)
if (part_copy->name == original_active_part->name)
{
auto it = data_parts_by_info.find(active_part->info);
if (it == data_parts_by_info.end())
throw Exception("No such active part by info. It is a bug", ErrorCodes::NO_SUCH_DATA_PART);
auto active_part_it = data_parts_by_info.find(original_active_part->info);
if (active_part_it == data_parts_by_info.end())
throw Exception("No such active part by info. It's a bug.", ErrorCodes::NO_SUCH_DATA_PART);
active_part->deleteOnDestroy();
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
data_parts_indexes.erase(it);
original_active_part->deleteOnDestroy();
(*active_part_it)->remove_time.store((*active_part_it)->modification_time, std::memory_order_relaxed);
data_parts_indexes.erase(active_part_it);
auto part_it = data_parts_indexes.insert(part).first;
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
return;
}
}
throw Exception("No such active part. It is a bug", ErrorCodes::NO_SUCH_DATA_PART);
throw Exception("No such active part. It's a bug.", ErrorCodes::NO_SUCH_DATA_PART);
}
MergeTreeData::DataPartsVector MergeTreeData::swapActiveParts(const MergeTreeData::DataPartsVector & copied_parts)
{
auto data_parts_lock = lockParts();
DataPartsVector failed_parts;
for (auto && copied_part : copied_parts)
{
auto part = getActiveContainingPart(copied_part->name);
if (!part || part->name != copied_part->name)
{
LOG_ERROR(log, "Failed to swap " << copied_part->name << ". Active part doesn't exist."
<< "Copy can be found. Copy path: '" << copied_part->getFullPath() << "'.");
continue;
}
copied_part->renameTo(part->name);
swapActivePart(copied_part, data_parts_lock);
}
return failed_parts;
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
@ -2575,33 +2597,6 @@ void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String &
}
void MergeTreeData::movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSpace::SpacePtr space)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName(), ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
LOG_DEBUG(log, "Cloning part " << part->getFullPath() << " to " << getFullPathOnDisk(reservation->getDisk()));
part->makeCloneOnDiskDetached(reservation);
MergeTreeData::MutableDataPartPtr copied_part =
std::make_shared<MergeTreeData::DataPart>(*this, reservation->getDisk(), part->name);
copied_part->relative_path = "detached/" + part->name;
copied_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
copied_part->renameTo(part->name);
swapActivePart(copied_part);
}
void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & name, const Context & /*context*/)
{
String partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();

View File

@ -398,9 +398,15 @@ public:
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock);
void swapActivePart(MergeTreeData::DataPartPtr part);
/// Swap part with it's identical copy (possible with another path on another disk).
/// If original part is not active or doesn't exist exception will be thrown.
void swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock & lock);
/// Tries to swap several active data parts with their copies.
/// Returns vector with failed parts (for example not active).
DataPartsVector swapActiveParts(const DataPartsVector & copied_parts);
/// Returns all parts in specified partition
DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id);
@ -546,9 +552,9 @@ public:
*/
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context);
private:
protected:
/// Moves part to specified space
void movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSpace::SpacePtr space);
virtual void movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSpace::SpacePtr space) = 0;
public:
/// Moves partition to specified Disk
@ -781,7 +787,6 @@ protected:
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
/// Used to serialize calls to grabOldParts.
std::mutex grab_old_parts_mutex;
/// The same for clearOldTemporaryDirectories.

View File

@ -311,7 +311,7 @@ public:
}
/// Returns elems ordered by size
MergeTreeData::DataPartsVector getElems()
MergeTreeData::DataPartsVector getAccumulatedParts()
{
MergeTreeData::DataPartsVector res;
for (const auto & elem : elems)
@ -367,7 +367,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMove(
for (auto && move : need_to_move)
{
auto min_volume_priority = policy->getVolumePriorityByDisk(move.first) + 1;
for (auto && part : move.second.getElems())
for (auto && part : move.second.getAccumulatedParts())
{
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_priority);
if (!reservation)

View File

@ -46,8 +46,9 @@ struct MargeTreeMoveEntry
using MergeTreeMovingParts = std::vector<MargeTreeMoveEntry>;
/** Can select the parts to merge and merge them.
*/
/** Can select parts for background processes and do them.
* Currently helps with merges, mutations and moves
*/
class MergeTreeDataMergerMutator
{
public:

View File

@ -763,7 +763,10 @@ bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePart
return true;
}
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current)
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
zkutil::ZooKeeperPtr zookeeper,
const MergeTreePartInfo & part_info,
const ReplicatedMergeTreeLogEntryData & current)
{
Queue to_wait;
size_t removed_entries = 0;
@ -1303,7 +1306,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
}
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name)
{
std::lock_guard lock(state_mutex);
virtual_parts.add(part_name);
@ -1621,25 +1624,31 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
if (left_max_block + 1 < right_min_block)
{
/// Fake part which will appear as merge result
MergeTreePartInfo gap_part_info(
left->info.partition_id, left_max_block + 1, right_min_block - 1,
MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
/// We don't select parts if any smaller part covered by our merge must exist after
/// processing replication log up to log_pointer.
Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
+ " to " + covered.back() + ") that are still not present on this replica between "
+ left->name + " and " + right->name;
+ " to " + covered.back() + ") that are still not present or beeing processed by "
+ " other background process on this replica between " + left->name + " and " + right->name;
return false;
}
}
Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl(
left->info.partition_id, left->info.getDataVersion(), lock);
/// left->info.partition_id == right->info.partition_id
Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl(
left->info.partition_id, right->info.getDataVersion(), lock);
if (left_mutation_ver != right_mutation_ver)
{
if (out_reason)

View File

@ -304,6 +304,7 @@ public:
/// Count the total number of active mutations that are finished (is_done = true).
size_t countFinishedMutations() const;
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
/// Return the version (block number) of the last mutation that we don't need to apply to the part
@ -318,12 +319,14 @@ public:
/// (because some mutations are probably done but we are not sure yet), returns true.
bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper);
/// Prohibit merges in the specified range.
void disableMergesInRange(const String & part_name);
/// Prohibit merges in the specified blocks range.
/// Add part to virtual_parts, which means that part must exist
/// after processing replication log up to log_pointer.
/// part maybe fake (look at ReplicatedMergeTreeMergePredicate)
void disableMergesInBlockRange(const String & part_name);
/** Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
* Locks queue's mutex.
*/
/// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
/// Locks queue's mutex.
bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);
/// A blocker that stops selects from the queue

View File

@ -36,6 +36,7 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int INCOMPATIBLE_COLUMNS;
extern const int PART_IS_TEMPORARILY_LOCKED;
}
namespace ActionLocks
@ -400,6 +401,7 @@ public:
{
if (!storage.currently_processing_in_background.count(part.part))
std::terminate();
storage.currently_processing_in_background.erase(part.part);
}
}
@ -651,46 +653,65 @@ bool StorageMergeTree::merge(
}
bool StorageMergeTree::moveParts()
void StorageMergeTree::movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSpace::SpacePtr space)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_processing_in_background.count(part))
throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
MergeTreeMovingParts parts_to_move;
parts_to_move.emplace_back(part, std::move(reservation));
auto copied_parts = merger_mutator.cloneParts(parts_to_move);
auto swap_failed_parts = swapActiveParts(copied_parts);
if (!swap_failed_parts.empty())
throw Exception("Cannot move " + toString(swap_failed_parts.size()) + " parts. It's a bug.", ErrorCodes::LOGICAL_ERROR);
}
bool StorageMergeTree::movePartsInBackground()
{
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
std::optional<CurrentlyMovingPartsTagger> moving_tagger;
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
auto can_move = [this](const DataPartPtr & part, String *)
{
return !currently_processing_in_background.count(part);
};
MergeTreeMovingParts parts_to_move;
if (!merger_mutator.selectPartsToMove(parts_to_move, can_move))
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto can_move = [this](const DataPartPtr & part, String *)
{
return !currently_processing_in_background.count(part);
};
if (!merger_mutator.selectPartsToMove(parts_to_move, can_move))
return false;
moving_tagger.emplace(std::move(parts_to_move), *this);
LOG_INFO(log, "Nothing to move.");
return false;
}
moving_tagger.emplace(std::move(parts_to_move), *this);
}
LOG_INFO(log, "Found " << moving_tagger->parts.size() << " parts to move.");
auto copied_parts = merger_mutator.cloneParts(moving_tagger->parts);
for (auto && copied_part : copied_parts)
{
auto part = getActiveContainingPart(copied_part->name);
if (!part || part->name != copied_part->name)
{
LOG_ERROR(log, "Currently moving '" + part->name + "' doesn't exists. Probably it was merged or deleted by hand. "
<< "Move will not finish. Copy can be found in '" + copied_part->getFullPath() + "'. It's a bug.");
continue;
}
copied_part->renameTo(part->name);
swapActivePart(copied_part);
}
auto swap_failed_parts = swapActiveParts(copied_parts);
if (!swap_failed_parts.empty())
LOG_ERROR(log, "Failed to move " << swap_failed_parts.size() << " parts. Original active parts doesn't exist. "
<< "Probably they were merged or deleted by hand. Move is not completed. "
<< "Copy can be found in detached folder. Copies can be found in detached folders. It's a bug.");
return true;
}
@ -839,7 +860,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (moveParts())
if (movePartsInBackground())
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (tryMutatePart())

View File

@ -103,7 +103,7 @@ private:
*/
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
bool moveParts();
bool movePartsInBackground();
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
@ -121,6 +121,7 @@ private:
void clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context);
void attachPartition(const ASTPtr & partition, bool part, const Context & context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartitionToSpace(MergeTreeData::DataPartPtr part, DiskSpace::SpacePtr space) override;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;

View File

@ -4822,7 +4822,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
{
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInRange(drop_range_fake_part_name);
queue.disableMergesInBlockRange(drop_range_fake_part_name);
}
}
@ -5141,7 +5141,7 @@ bool StorageReplicatedMergeTree::dropPartsInPartition(
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
{
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInRange(drop_range_fake_part_name);
queue.disableMergesInBlockRange(drop_range_fake_part_name);
}
LOG_DEBUG(log, "Disabled merges covered by range " << drop_range_fake_part_name);

View File

@ -263,6 +263,10 @@ private:
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// A task which move parts to another disks/volumes
/// Transparent for replication.
BackgroundProcessingPool::TaskHandle move_parts_task;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
@ -333,6 +337,8 @@ private:
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction,
const DataPartPtr & part);
void movePartitionToSpace(MergeTreeData::DataPartPtr /* part */, DiskSpace::SpacePtr /* space */) override {}
void getCommitPartOps(
Coordination::Requests & ops,
MutableDataPartPtr & part,