fix some tests

This commit is contained in:
Alexander Tokmakov 2023-05-15 18:28:12 +02:00
parent d17aa828b3
commit 05ae7b2c2d
15 changed files with 130 additions and 16 deletions

View File

@ -542,6 +542,7 @@ void MutationsInterpreter::prepare(bool dry_run)
if (commands.empty()) if (commands.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list");
/// TODO Should we get columns, indices and projections from the part itself? Table metadata may be different
const ColumnsDescription & columns_desc = metadata_snapshot->getColumns(); const ColumnsDescription & columns_desc = metadata_snapshot->getColumns();
const IndicesDescription & indices_desc = metadata_snapshot->getSecondaryIndices(); const IndicesDescription & indices_desc = metadata_snapshot->getSecondaryIndices();
const ProjectionsDescription & projections_desc = metadata_snapshot->getProjections(); const ProjectionsDescription & projections_desc = metadata_snapshot->getProjections();

View File

@ -50,7 +50,7 @@ public:
bool return_all_columns_ = false, bool return_all_columns_ = false,
bool return_mutated_rows_ = false); bool return_mutated_rows_ = false);
/// Special case for MergeTree /// Special case for *MergeTree
MutationsInterpreter( MutationsInterpreter(
MergeTreeData & storage_, MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_, MergeTreeData::DataPartPtr source_part_,
@ -123,7 +123,7 @@ public:
private: private:
StoragePtr storage; StoragePtr storage;
/// Special case for MergeTree. /// Special case for *MergeTree.
MergeTreeData * data = nullptr; MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part; MergeTreeData::DataPartPtr part;
}; };

View File

@ -90,7 +90,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
} }
new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version); new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version);
commands = std::make_shared<MutationCommands>(storage.queue.getMutationCommands(source_part, new_part_info.mutation)); Strings mutation_ids;
commands = std::make_shared<MutationCommands>(storage.queue.getMutationCommands(source_part, new_part_info.mutation, mutation_ids));
LOG_TRACE(log, "Mutating part {} with mutation commands from {} mutations ({}): {}",
entry.new_part_name, commands->size(), fmt::join(mutation_ids, ", "), commands->toString());
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks. /// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception. /// Can throw an exception.

View File

@ -58,7 +58,9 @@ static void splitAndModifyMutationCommands(
MergeTreeData::DataPartPtr part, MergeTreeData::DataPartPtr part,
const MutationCommands & commands, const MutationCommands & commands,
MutationCommands & for_interpreter, MutationCommands & for_interpreter,
MutationCommands & for_file_renames) MutationCommands & for_file_renames,
const StorageMetadataPtr & table_metadata_snapshot,
Poco::Logger * log)
{ {
auto part_columns = part->getColumnsDescription(); auto part_columns = part->getColumnsDescription();
@ -142,6 +144,29 @@ static void splitAndModifyMutationCommands(
{ {
if (!mutated_columns.contains(column.name)) if (!mutated_columns.contains(column.name))
{ {
if (!table_metadata_snapshot->getColumns().has(column.name))
{
/// We cannot add the column because there's no such column in table.
/// It's okay if the column was dropped. It may also absent in dropped_columns
/// if the corresponding MUTATE_PART entry was not created yet or was created separately from current MUTATE_PART.
/// But we don't know for sure what happened.
auto part_metadata_version = part->getMetadataVersion();
auto table_metadata_version = table_metadata_snapshot->getMetadataVersion();
if (table_metadata_version <= part_metadata_version)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} with metadata version {} contains column {} that is absent "
"in table {} with metadata version {}",
part->name, part_metadata_version, column.name,
part->storage.getStorageID().getNameForLogs(), table_metadata_version);
if (part_metadata_version < table_metadata_version)
{
LOG_WARNING(log, "Ignoring column {} from part {} with metadata version {} because there is no such column "
"in table {} with metadata version {}. Assuming the column was dropped", column.name, part->name,
part_metadata_version, part->storage.getStorageID().getNameForLogs(), table_metadata_version);
continue;
}
}
for_interpreter.emplace_back( for_interpreter.emplace_back(
MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type}); MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type});
} }
@ -1776,7 +1801,8 @@ bool MutateTask::prepare()
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false); context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0)); context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
MutationHelpers::splitAndModifyMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames); MutationHelpers::splitAndModifyMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter,
ctx->for_file_renames, ctx->metadata_snapshot, ctx->log);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0); ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);

View File

@ -99,4 +99,13 @@ std::shared_ptr<const IBackupEntry> ReplicatedMergeTreeMutationEntry::backup() c
return std::make_shared<BackupEntryFromMemory>(out.str()); return std::make_shared<BackupEntryFromMemory>(out.str());
} }
String ReplicatedMergeTreeMutationEntry::getBlockNumbersForLogs() const
{
WriteBufferFromOwnString out;
for (const auto & kv : block_numbers)
out << kv.first << " = " << kv.second << "; ";
return out.str();
}
} }

View File

@ -51,6 +51,8 @@ struct ReplicatedMergeTreeMutationEntry
bool isAlterMutation() const { return alter_version != -1; } bool isAlterMutation() const { return alter_version != -1; }
std::shared_ptr<const IBackupEntry> backup() const; std::shared_ptr<const IBackupEntry> backup() const;
String getBlockNumbersForLogs() const;
}; };
using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>; using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;

View File

@ -955,13 +955,14 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
const String & partition_id = pair.first; const String & partition_id = pair.first;
Int64 block_num = pair.second; Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation); mutations_by_partition[partition_id].emplace(block_num, &mutation);
LOG_TRACE(log, "Adding mutation {} for partition {} for all block numbers less than {}", entry->znode_name, partition_id, block_num);
} }
LOG_TRACE(log, "Adding mutation {} for {} partitions (data versions: {})",
entry->znode_name, entry->block_numbers.size(), entry->getBlockNumbersForLogs());
/// Initialize `mutation.parts_to_do`. We cannot use only current_parts + virtual_parts here so we /// Initialize `mutation.parts_to_do`. We cannot use only current_parts + virtual_parts here so we
/// traverse all the queue and build correct state of parts_to_do. /// traverse all the queue and build correct state of parts_to_do.
auto queue_representation = getQueueRepresentation(queue, format_version); auto queue_representation = getQueueRepresentation(queue, format_version);
mutation.parts_to_do = getPartNamesToMutate(*entry, virtual_parts, queue_representation, format_version); mutation.parts_to_do = getPartNamesToMutate(*entry, current_parts, queue_representation, format_version);
if (mutation.parts_to_do.size() == 0) if (mutation.parts_to_do.size() == 0)
some_mutations_are_probably_done = true; some_mutations_are_probably_done = true;
@ -1801,7 +1802,7 @@ std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCo
} }
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const
{ {
/// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception) /// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception)
/// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry /// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry
@ -1840,6 +1841,8 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
MutationCommands commands; MutationCommands commands;
for (auto it = begin; it != end; ++it) for (auto it = begin; it != end; ++it)
{ {
chassert(mutation_pointer < it->second->entry->znode_name);
mutation_ids.push_back(it->second->entry->znode_name);
const auto & commands_from_entry = it->second->entry->commands; const auto & commands_from_entry = it->second->entry->commands;
commands.insert(commands.end(), commands_from_entry.begin(), commands_from_entry.end()); commands.insert(commands.end(), commands_from_entry.begin(), commands_from_entry.end());
} }
@ -2600,7 +2603,7 @@ void ReplicatedMergeTreeQueue::removeCurrentPartsFromMutations()
{ {
std::lock_guard state_lock(state_mutex); std::lock_guard state_lock(state_mutex);
for (const auto & part_name : current_parts.getParts()) for (const auto & part_name : current_parts.getParts())
removeCoveredPartsFromMutations(part_name, /*remove_part = */ true, /*remove_covered_parts = */ true); removeCoveredPartsFromMutations(part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
} }
} }

View File

@ -392,7 +392,8 @@ public:
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge /// Returns functor which used by MergeTreeMergerMutator to select parts for merge
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint); ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const; MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version,
Strings & mutation_ids) const;
/// Return mutation commands for part which could be not applied to /// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly, /// it according to part mutation version. Used when we apply alter commands on fly,

View File

@ -5265,12 +5265,12 @@ void StorageReplicatedMergeTree::alter(
fs::path(zookeeper_path) / "log/log-", alter_entry->toString(), zkutil::CreateMode::PersistentSequential)); fs::path(zookeeper_path) / "log/log-", alter_entry->toString(), zkutil::CreateMode::PersistentSequential));
PartitionBlockNumbersHolder partition_block_numbers_holder; PartitionBlockNumbersHolder partition_block_numbers_holder;
ReplicatedMergeTreeMutationEntry mutation_entry;
if (have_mutation) if (have_mutation)
{ {
delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context); delayMutationOrThrowIfNeeded(&partial_shutdown_event, query_context);
const String mutations_path(fs::path(zookeeper_path) / "mutations"); const String mutations_path(fs::path(zookeeper_path) / "mutations");
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.alter_version = new_metadata_version; mutation_entry.alter_version = new_metadata_version;
mutation_entry.source_replica = replica_name; mutation_entry.source_replica = replica_name;
mutation_entry.commands = std::move(maybe_mutation_commands); mutation_entry.commands = std::move(maybe_mutation_commands);
@ -5322,12 +5322,16 @@ void StorageReplicatedMergeTree::alter(
/// ReplicatedMergeTreeMutationEntry record in /mutations /// ReplicatedMergeTreeMutationEntry record in /mutations
String mutation_path = dynamic_cast<const Coordination::CreateResponse &>(*results[mutation_path_idx]).path_created; String mutation_path = dynamic_cast<const Coordination::CreateResponse &>(*results[mutation_path_idx]).path_created;
mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1); mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1);
LOG_DEBUG(log, "Created log entry {} to update table metadata to version {}, created a mutation {} (data versions: {})",
alter_entry->znode_name, alter_entry->alter_version, *mutation_znode, mutation_entry.getBlockNumbersForLogs());
} }
else else
{ {
/// ALTER_METADATA record in replication /log /// ALTER_METADATA record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[alter_path_idx]).path_created; String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[alter_path_idx]).path_created;
alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
LOG_DEBUG(log, "Created log entry {} to update table metadata to version {}",
alter_entry->znode_name, alter_entry->alter_version);
} }
break; break;
} }
@ -6493,7 +6497,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
const String & path_created = const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created; dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
mutation_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); mutation_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID {}", mutation_entry.znode_name); LOG_TRACE(log, "Created mutation with ID {} (data versions: {})",
mutation_entry.znode_name, mutation_entry.getBlockNumbersForLogs());
break; break;
} }
else if (rc == Coordination::Error::ZBADVERSION) else if (rc == Coordination::Error::ZBADVERSION)

View File

@ -286,11 +286,11 @@ def get_processlist_with_stacktraces(args):
-- NOTE: view() here to do JOIN on shards, instead of initiator -- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view( FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap( arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))), x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace s.trace), '\n') AS stacktrace
)) AS stacktraces, )) AS stacktraces
p.*
FROM system.processes p FROM system.processes p
JOIN system.stack_trace s USING (query_id) JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%' WHERE query NOT LIKE '%system.processes%'
@ -307,11 +307,11 @@ def get_processlist_with_stacktraces(args):
args, args,
""" """
SELECT SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap( arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))), x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace s.trace), '\n') AS stacktrace
)) AS stacktraces, )) AS stacktraces
p.*
FROM system.processes p FROM system.processes p
JOIN system.stack_trace s USING (query_id) JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%' WHERE query NOT LIKE '%system.processes%'

View File

@ -262,6 +262,8 @@ def test_default_codec_multiple(start_cluster):
) )
) )
node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15)
# Same codec for all # Same codec for all
assert ( assert (
get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_0") get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_0")
@ -330,6 +332,8 @@ def test_default_codec_multiple(start_cluster):
node1.query("OPTIMIZE TABLE compression_table_multiple FINAL") node1.query("OPTIMIZE TABLE compression_table_multiple FINAL")
node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15)
assert ( assert (
get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_1") get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_1")
== CODECS_MAPPING["Multiple"] == CODECS_MAPPING["Multiple"]

View File

@ -0,0 +1,5 @@
0000000000 UPDATE n = 2 WHERE n = 1 ['all_0_0_0'] 0
1
0000000000 UPDATE n = 2 WHERE n = 1 ['all_0_0_0'] 0
2
0000000000 UPDATE n = 2 WHERE n = 1 [] 1

View File

@ -0,0 +1,33 @@
create table mut (n int) engine=ReplicatedMergeTree('/test/02440/{database}/mut', '1') order by tuple();
set insert_keeper_fault_injection_probability=0;
insert into mut values (1);
system stop merges mut;
alter table mut update n = 2 where n = 1;
-- it will create MUTATE_PART entry, but will not execute it
select mutation_id, command, parts_to_do_names, is_done from system.mutations where database=currentDatabase() and table='mut';
-- merges (and mutations) will start again after detach/attach, we need to avoid this somehow...
create table tmp (n int) engine=MergeTree order by tuple() settings index_granularity=1;
insert into tmp select * from numbers(1000);
alter table tmp update n = sleepEachRow(1) where 1;
select sleepEachRow(2) as higher_probablility_of_reproducing_the_issue format Null;
-- it will not execute MUTATE_PART, because another mutation is currently executing (in tmp)
alter table mut modify setting max_number_of_mutations_for_replica=1;
detach table mut;
attach table mut;
-- mutation should not be finished yet
select * from mut;
select mutation_id, command, parts_to_do_names, is_done from system.mutations where database=currentDatabase() and table='mut';
alter table mut modify setting max_number_of_mutations_for_replica=100;
system sync replica mut;
-- and now it should
select * from mut;
select mutation_id, command, parts_to_do_names, is_done from system.mutations where database=currentDatabase() and table='mut';
drop table tmp; -- btw, it will check that mutation can be cancelled between blocks on shutdown

View File

@ -0,0 +1,2 @@
MUTATE_PART all_0_0_0_1 ['all_0_0_0']
1 2

View File

@ -0,0 +1,20 @@
create table mut (n int, m int, k int) engine=ReplicatedMergeTree('/test/02441/{database}/mut', '1') order by n;
set insert_keeper_fault_injection_probability=0;
insert into mut values (1, 2, 3), (10, 20, 30);
system stop merges mut;
alter table mut delete where n = 10;
alter table mut drop column k settings alter_sync=0;
system sync replica mut pull;
-- a funny way to wait for ALTER_METADATA to disappear from the replication queue
select sleepEachRow(1) from url('http://localhost:8123/?param_tries={1..30}&query=' || encodeURLComponent(
'select * from system.replication_queue where database=''' || currentDatabase() || ''' and table=''mut'' and type=''ALTER_METADATA'''
), 'LineAsString', 's String') settings max_threads=1 format Null;
select type, new_part_name, parts_to_merge from system.replication_queue where database=currentDatabase() and table='mut';
system start merges mut;
set receive_timeout=30;
system sync replica mut;
select * from mut;