fix skipping of some mutations

This commit is contained in:
Alexander Tokmakov 2021-12-15 21:19:29 +03:00
parent 89b77d3056
commit e185ad260b
6 changed files with 33 additions and 27 deletions

View File

@ -3867,6 +3867,27 @@ std::unordered_set<String> MergeTreeData::getPartitionIDsFromQuery(const ASTs &
return partition_ids;
}
std::set<String> MergeTreeData::getPartitionIdsAffectedByCommands(
const MutationCommands & commands, ContextPtr query_context) const
{
std::set<String> affected_partition_ids;
for (const auto & command : commands)
{
if (!command.partition)
{
affected_partition_ids.clear();
break;
}
affected_partition_ids.insert(
getPartitionIDFromQuery(command.partition, query_context)
);
}
return affected_partition_ids;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(
const DataPartStates & affordable_states, DataPartStateVector * out_states, bool require_projection_parts) const

View File

@ -706,6 +706,7 @@ public:
/// For ATTACH/DETACH/DROP PARTITION.
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const;
std::unordered_set<String> getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const;
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM

View File

@ -529,6 +529,9 @@ void StorageMergeTree::waitForMutation(const String & mutation_id)
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version = startMutation(commands, query_context);
if (query_context->getSettingsRef().mutations_sync > 0)
@ -966,6 +969,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
auto commands = MutationCommands::create();
size_t current_ast_elements = 0;
auto last_mutation_to_apply = mutations_end_it;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
/// Do not squash mutation from different transactions to be able to commit/rollback them independently.
@ -1006,7 +1010,8 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
continue;
/// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED)
break;
}
}
@ -1015,8 +1020,10 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
}
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));
if (!commands->empty())
{
bool is_partition_affected = false;
@ -1041,13 +1048,13 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
/// Shall not create a new part, but will do that later if mutation with higher version appear.
/// This is needed in order to not produce excessive mutations of non-related parts.
auto block_range = std::make_pair(part->info.min_block, part->info.max_block);
updated_version_by_block_range[block_range] = current_mutations_by_version.rbegin()->first;
updated_version_by_block_range[block_range] = last_mutation_to_apply->first;
were_some_mutations_for_some_parts_skipped = true;
continue;
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
new_part_info.mutation = last_mutation_to_apply->first;
future_part->parts.push_back(part);
future_part->part_info = new_part_info;

View File

@ -4533,28 +4533,6 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
}
std::set<String> StorageReplicatedMergeTree::getPartitionIdsAffectedByCommands(
const MutationCommands & commands, ContextPtr query_context) const
{
std::set<String> affected_partition_ids;
for (const auto & command : commands)
{
if (!command.partition)
{
affected_partition_ids.clear();
break;
}
affected_partition_ids.insert(
getPartitionIDFromQuery(command.partition, query_context)
);
}
return affected_partition_ids;
}
PartitionBlockNumbersHolder StorageReplicatedMergeTree::allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const
{

View File

@ -717,7 +717,6 @@ private:
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;

View File

@ -28,6 +28,6 @@ PARTITION BY p
ORDER BY t
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0;
INSERT INTO data VALUES (1, now());
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 341 }
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 248 }
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '1';
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '2';