Moves task shall be started if new storage policy needs them.

This commit is contained in:
Vladimir Chebotarev 2020-06-23 19:40:58 +03:00
parent 8b2a5d81df
commit bb8da71eff
6 changed files with 63 additions and 35 deletions

View File

@ -1495,6 +1495,8 @@ void MergeTreeData::changeSettings(
{
if (new_settings)
{
bool has_storage_policy_changed = false;
const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
for (const auto & change : new_changes)
@ -1503,28 +1505,34 @@ void MergeTreeData::changeSettings(
StoragePolicyPtr new_storage_policy = global_context.getStoragePolicy(change.value.safeGet<String>());
StoragePolicyPtr old_storage_policy = getStoragePolicy();
checkStoragePolicy(new_storage_policy);
std::unordered_set<String> all_diff_disk_names;
for (const auto & disk : new_storage_policy->getDisks())
all_diff_disk_names.insert(disk->getName());
for (const auto & disk : old_storage_policy->getDisks())
all_diff_disk_names.erase(disk->getName());
for (const String & disk_name : all_diff_disk_names)
/// StoragePolicy of different version or name is guaranteed to have different pointer
if (new_storage_policy != old_storage_policy)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
if (disk->exists(relative_data_path))
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}
checkStoragePolicy(new_storage_policy);
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(relative_data_path + "detached");
std::unordered_set<String> all_diff_disk_names;
for (const auto & disk : new_storage_policy->getDisks())
all_diff_disk_names.insert(disk->getName());
for (const auto & disk : old_storage_policy->getDisks())
all_diff_disk_names.erase(disk->getName());
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
if (disk->exists(relative_data_path))
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(relative_data_path + "detached");
}
/// FIXME how would that be done while reloading configuration???
has_storage_policy_changed = true;
}
/// FIXME how would that be done while reloading configuration???
}
MergeTreeSettings copy = *getSettings();
@ -1533,6 +1541,9 @@ void MergeTreeData::changeSettings(
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
new_metadata.setSettingsChanges(new_settings);
setInMemoryMetadata(new_metadata);
if (has_storage_policy_changed)
startBackgroundMovesIfNeeded();
}
}
@ -3291,12 +3302,11 @@ bool MergeTreeData::selectPartsAndMove()
bool MergeTreeData::areBackgroundMovesNeeded() const
{
auto policy = getStoragePolicy();
auto metadata_snapshot = getInMemoryMetadataPtr();
if (policy->getVolumes().size() > 1)
return true;
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && metadata_snapshot->hasAnyMoveTTL();
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)

View File

@ -794,8 +794,6 @@ protected:
void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;
void setStoragePolicy(const String & new_storage_policy_name, bool only_check = false);
/// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
void calculateColumnSizesImpl();
/// Adds or subtracts the contribution of the part to compressed column sizes.
@ -873,6 +871,8 @@ private:
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason) const;
virtual void startBackgroundMovesIfNeeded() = 0;
};
}

View File

@ -102,12 +102,7 @@ void StorageMergeTree::startup()
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
merge_pool.startTask(merging_mutating_task_handle);
if (areBackgroundMovesNeeded())
{
auto & move_pool = global_context.getBackgroundMovePool();
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
move_pool.startTask(moving_task_handle);
}
startBackgroundMovesIfNeeded();
}
catch (...)
{
@ -464,6 +459,18 @@ bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
return true;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !moving_task_handle)
{
auto & move_pool = global_context.getBackgroundMovePool();
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
move_pool.startTask(moving_task_handle);
}
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);

View File

@ -159,6 +159,8 @@ private:
/// Just checks versions of each active data part
bool isMutationDone(Int64 mutation_version) const;
void startBackgroundMovesIfNeeded() override;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;
friend struct CurrentlyMergingPartsTagger;

View File

@ -3263,12 +3263,7 @@ void StorageReplicatedMergeTree::startup()
pool.startTask(queue_task_handle);
}
if (areBackgroundMovesNeeded())
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
startBackgroundMovesIfNeeded();
}
catch (...)
{
@ -5702,4 +5697,16 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar
{
return queue.getFirstAlterMutationCommandsForPart(part);
}
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !move_parts_task_handle)
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
}
}

View File

@ -551,6 +551,8 @@ private:
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/