Fix missed entries in system.part_log in case of fetch preferred over merges/mutations

The NEED_PREPARE do not call part_log_writer() before.

v2:
- Add a test for missed entries in system.part_log for merge
- Fix part_log_writer
- Add a test for missed entries in system.part_log for mutate

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-03-01 17:07:08 +01:00
parent 19b1a675a1
commit df44ed173c
7 changed files with 163 additions and 58 deletions

View File

@ -47,13 +47,22 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
int32_t metadata_version = metadata_snapshot->getMetadataVersion();
const auto storage_settings_ptr = storage.getSettings();
stopwatch_ptr = std::make_unique<Stopwatch>();
auto part_log_writer = [this, stopwatch = *stopwatch_ptr](const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
};
if (storage_settings_ptr->always_fetch_merged_part)
{
LOG_INFO(log, "Will fetch part {} because setting 'always_fetch_merged_part' is true", entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -68,7 +77,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -88,7 +97,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
}
@ -107,7 +116,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -127,7 +136,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -139,7 +148,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -167,7 +176,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
}
@ -227,7 +236,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -267,7 +276,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
else if (storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false))
@ -284,7 +293,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
else
@ -311,7 +320,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
task_context);
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, NO_TRANSACTION_RAW);
stopwatch_ptr = std::make_unique<Stopwatch>();
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
future_merged_part,
@ -333,13 +341,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
for (auto & item : future_merged_part->parts)
priority.value += item->getBytesOnDisk();
return {true, true, [this, stopwatch = *stopwatch_ptr] (const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
}};
return PrepareResult{
.prepared_successfully = true,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = part_log_writer,
};
}

View File

@ -20,6 +20,22 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
const auto storage_settings_ptr = storage.getSettings();
LOG_TRACE(log, "Executing log entry to mutate part {} to {}", source_part_name, entry.new_part_name);
new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version);
future_mutated_part = std::make_shared<FutureMergedMutatedPart>();
future_mutated_part->name = entry.new_part_name;
future_mutated_part->uuid = entry.new_part_uuid;
future_mutated_part->part_info = new_part_info;
stopwatch_ptr = std::make_unique<Stopwatch>();
auto part_log_writer = [this](const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(),
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
};
MergeTreeData::DataPartPtr source_part = storage.getActiveContainingPart(source_part_name);
if (!source_part)
{
@ -29,10 +45,13 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
future_mutated_part->parts.push_back(source_part);
future_mutated_part->part_format = source_part->getFormat();
if (source_part->name != source_part_name)
{
LOG_WARNING(log,
@ -44,7 +63,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -63,7 +82,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
}
@ -84,13 +103,12 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
}
new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version);
Strings mutation_ids;
commands = std::make_shared<MutationCommands>(storage.queue.getMutationCommands(source_part, new_part_info.mutation, mutation_ids));
LOG_TRACE(log, "Mutating part {} with mutation commands from {} mutations ({}): {}",
@ -99,6 +117,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
reserved_space = storage.reserveSpace(estimated_space_for_result, source_part->getDataPartStorage());
future_mutated_part->updatePath(storage, reserved_space.get());
table_lock_holder = storage.lockForShare(
RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
@ -106,14 +125,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, NO_TRANSACTION_RAW);
future_mutated_part = std::make_shared<FutureMergedMutatedPart>();
future_mutated_part->name = entry.new_part_name;
future_mutated_part->uuid = entry.new_part_uuid;
future_mutated_part->parts.push_back(source_part);
future_mutated_part->part_info = new_part_info;
future_mutated_part->updatePath(storage, reserved_space.get());
future_mutated_part->part_format = source_part->getFormat();
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{
if (auto disk = reserved_space->getDisk(); disk->supportZeroCopyReplication())
@ -124,7 +135,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
@ -163,7 +174,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
else if (storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false))
@ -182,7 +193,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
.part_log_writer = part_log_writer,
};
}
else
@ -201,8 +212,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
future_mutated_part,
task_context);
stopwatch_ptr = std::make_unique<Stopwatch>();
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, metadata_snapshot, commands, merge_mutate_entry.get(),
entry.create_time, task_context, NO_TRANSACTION_PTR, reserved_space, table_lock_holder);
@ -211,13 +220,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
for (auto & item : future_mutated_part->parts)
priority.value += item->getBytesOnDisk();
return {true, true, [this] (const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(),
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
}};
return PrepareResult{
.prepared_successfully = true,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = part_log_writer,
};
}

View File

@ -164,8 +164,16 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
auto execute_fetch = [&] (bool need_to_check_missing_part) -> bool
{
if (storage.executeFetch(entry, need_to_check_missing_part))
return remove_processed_entry();
try
{
if (storage.executeFetch(entry, need_to_check_missing_part))
return remove_processed_entry();
}
catch (...)
{
part_log_writer(ExecutionStatus::fromCurrentException("", true));
throw;
}
return false;
};
@ -205,8 +213,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
}
catch (...)
{
if (part_log_writer)
part_log_writer(ExecutionStatus::fromCurrentException("", true));
part_log_writer(ExecutionStatus::fromCurrentException("", true));
throw;
}
@ -214,17 +221,8 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
}
case State::NEED_FINALIZE :
{
try
{
if (!finalize(part_log_writer))
return execute_fetch(/* need_to_check_missing = */true);
}
catch (...)
{
if (part_log_writer)
part_log_writer(ExecutionStatus::fromCurrentException("", true));
throw;
}
if (!finalize(part_log_writer))
return execute_fetch(/* need_to_check_missing = */true);
return remove_processed_entry();
}

View File

@ -0,0 +1,10 @@
before
rmt_master NewPart 0 1
rmt_master MergeParts 0 1
rmt_slave MergeParts 1 0
rmt_slave DownloadPart 0 1
after
rmt_master NewPart 0 1
rmt_master MergeParts 0 1
rmt_slave MergeParts 1 0
rmt_slave DownloadPart 0 2

View File

@ -0,0 +1,34 @@
-- Tags: no-replicated-database, no-parallel
drop table if exists rmt_master;
drop table if exists rmt_slave;
create table rmt_master (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'master') order by key settings always_fetch_merged_part=0;
-- always_fetch_merged_part=1, consider this table as a "slave"
create table rmt_slave (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'slave') order by key settings always_fetch_merged_part=1;
insert into rmt_master values (1);
system sync replica rmt_master;
system sync replica rmt_slave;
system stop replicated sends rmt_master;
optimize table rmt_master final settings alter_sync=1, optimize_throw_if_noop=1;
select sleep(3) format Null;
system flush logs;
select 'before';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
system start replicated sends rmt_master;
-- sleep few seconds to try rmt_slave to fetch the part and reflect this error
-- in system.part_log
select sleep(3) format Null;
system sync replica rmt_slave;
system flush logs;
select 'after';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
drop table rmt_master;
drop table rmt_slave;

View File

@ -0,0 +1,10 @@
before
rmt_master NewPart 0 1
rmt_master MutatePart 0 1
rmt_slave DownloadPart 0 1
rmt_slave MutatePart 1 0
after
rmt_master NewPart 0 1
rmt_master MutatePart 0 1
rmt_slave DownloadPart 0 2
rmt_slave MutatePart 1 0

View File

@ -0,0 +1,40 @@
-- Tags: no-replicated-database, no-parallel
drop table if exists rmt_master;
drop table if exists rmt_slave;
create table rmt_master (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'master') order by tuple() settings always_fetch_merged_part=0;
-- prefer_fetch_merged_part_*_threshold=0, consider this table as a "slave"
create table rmt_slave (key Int) engine=ReplicatedMergeTree('/clickhouse/{database}', 'slave') order by tuple() settings prefer_fetch_merged_part_time_threshold=0, prefer_fetch_merged_part_size_threshold=0;
insert into rmt_master values (1);
system sync replica rmt_master;
system sync replica rmt_slave;
system stop replicated sends rmt_master;
system stop pulling replication log rmt_slave;
alter table rmt_master update key=key+100 where 1 settings alter_sync=1;
-- first we need to make the rmt_master execute mutation so that it will have
-- the part, and rmt_slave will consider it instead of performing mutation on
-- it's own, otherwise prefer_fetch_merged_part_*_threshold will be simply ignored
select sleep(3) format Null;
system start pulling replication log rmt_slave;
-- and sleep few more seconds to try rmt_slave to fetch the part and reflect
-- this error in system.part_log
select sleep(3) format Null;
system flush logs;
select 'before';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
system start replicated sends rmt_master;
select sleep(3) format Null;
system sync replica rmt_slave;
system flush logs;
select 'after';
select table, event_type, error>0, countIf(error=0) from system.part_log where database = currentDatabase() group by 1, 2, 3 order by 1, 2, 3;
drop table rmt_master;
drop table rmt_slave;