Make RENAME COLUMN barrier

This commit is contained in:
alesapin 2023-02-02 21:11:19 +01:00
parent fc6c62e2ff
commit b8b0f1c3d3
4 changed files with 68 additions and 6 deletions

View File

@ -102,6 +102,15 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark); auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (file_size == 0 && marks_count != 0)
{
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Empty marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
file_size, expected_uncompressed_size);
}
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size) if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception( throw Exception(
ErrorCodes::CORRUPTED_DATA, ErrorCodes::CORRUPTED_DATA,
@ -138,7 +147,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
} }
if (i * mark_size != expected_uncompressed_size) if (i * mark_size != expected_uncompressed_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path); {
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, marks expected {} (bytes size {}), marks read {} (bytes size {})",
mrk_path, marks_count, expected_uncompressed_size, i, reader->count());
}
} }
res->protect(); res->protect();

View File

@ -1813,7 +1813,30 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
MutationCommands commands; MutationCommands commands;
for (auto it = begin; it != end; ++it) for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end()); {
bool rename_command = false;
if (it->second->entry->isAlterMutation())
{
const auto & single_mutation_commands = it->second->entry->commands;
for (const auto & command : single_mutation_commands)
{
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
rename_command = true;
break;
}
}
}
if (rename_command)
{
if (commands.empty())
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
break;
}
else
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
}
return commands; return commands;
} }

View File

@ -1114,9 +1114,34 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (current_ast_elements + commands_size >= max_ast_elements) if (current_ast_elements + commands_size >= max_ast_elements)
break; break;
current_ast_elements += commands_size; bool rename_command = false;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end()); const auto & single_mutation_commands = it->second.commands;
last_mutation_to_apply = it; for (const auto & command : single_mutation_commands)
{
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
rename_command = true;
break;
}
}
if (rename_command)
{
if (commands->empty())
{
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
}
break;
}
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
}
} }
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it)); assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));

View File

@ -32,7 +32,7 @@ while [[ $counter -lt $retries ]]; do
done done
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename UPDATE v2 = 77 WHERE 1 = 1" & $CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename UPDATE v2 = 77 WHERE 1 = 1 SETTINGS mutations_sync = 2" &
counter=0 retries=60 counter=0 retries=60