Add MergePartsStart and MutatePartsStart events into part_log

This commit is contained in:
Alexey Milovidov 2024-10-19 02:06:35 +02:00
parent 6ea6b766bf
commit f71ec041e7
7 changed files with 39 additions and 21 deletions

View File

@ -68,6 +68,8 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"RemovePart", static_cast<Int8>(REMOVE_PART)},
{"MutatePart", static_cast<Int8>(MUTATE_PART)},
{"MovePart", static_cast<Int8>(MOVE_PART)},
{"MergePartsStart", static_cast<Int8>(MERGE_PARTS_START)},
{"MutatePartStart", static_cast<Int8>(MUTATE_PART_START)},
}
);
@ -102,10 +104,12 @@ ColumnsDescription PartLogElement::getColumnsDescription()
"Type of the event that occurred with the data part. "
"Can have one of the following values: "
"NewPart — Inserting of a new data part, "
"MergeParts — Merging of data parts, "
"MergePartsStart — Merging of data parts has started, "
"MergeParts — Merging of data parts has finished, "
"DownloadPart — Downloading a data part, "
"RemovePart — Removing or detaching a data part using DETACH PARTITION, "
"MutatePart — Mutating of a data part, "
"MutatePartStart — Mutating of a data part has started, "
"MutatePart — Mutating of a data part has finished, "
"MovePart — Moving the data part from the one disk to another one."},
{"merge_reason", std::move(merge_reason_datatype),
"The reason for the event with type MERGE_PARTS. Can have one of the following values: "

View File

@ -26,6 +26,8 @@ struct PartLogElement
REMOVE_PART = 4,
MUTATE_PART = 5,
MOVE_PART = 6,
MERGE_PARTS_START = 7,
MUTATE_PART_START = 8,
};
/// Copy of MergeAlgorithm since values are written to disk.

View File

@ -335,6 +335,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
future_merged_part,
task_context);
storage.writePartLog(
PartLogElement::MERGE_PARTS_START, {}, 0,
entry.new_part_name, part, parts, merge_mutate_entry.get(), {});
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, NO_TRANSACTION_RAW);
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
@ -352,7 +356,6 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
storage.merging_params,
NO_TRANSACTION_PTR);
/// Adjust priority
for (auto & item : future_merged_part->parts)
priority.value += item->getBytesOnDisk();

View File

@ -92,6 +92,10 @@ void MergePlainMergeTreeTask::prepare()
future_part,
task_context);
storage.writePartLog(
PartLogElement::MERGE_PARTS_START, {}, 0,
future_part->name, new_part, future_part->parts, merge_list_entry.get(), {});
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
@ -121,19 +125,19 @@ void MergePlainMergeTreeTask::prepare()
};
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
future_part,
metadata_snapshot,
merge_list_entry.get(),
{} /* projection_merge_list_element */,
table_lock_holder,
time(nullptr),
task_context,
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
future_part,
metadata_snapshot,
merge_list_entry.get(),
{} /* projection_merge_list_element */,
table_lock_holder,
time(nullptr),
task_context,
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
}

View File

@ -7878,7 +7878,8 @@ try
part_log_elem.event_type = type;
if (part_log_elem.event_type == PartLogElement::MERGE_PARTS)
if (part_log_elem.event_type == PartLogElement::MERGE_PARTS
|| part_log_elem.event_type == PartLogElement::MERGE_PARTS_START)
{
if (merge_entry)
{
@ -7946,10 +7947,6 @@ try
{
part_log_elem.profile_counters = profile_counters;
}
else
{
LOG_WARNING(log, "Profile counters are not set");
}
part_log->add(std::move(part_log_elem));
}

View File

@ -226,6 +226,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
future_mutated_part,
task_context);
storage.writePartLog(
PartLogElement::MUTATE_PART_START, {}, 0,
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), {});
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, metadata_snapshot, commands, merge_mutate_entry.get(),
entry.create_time, task_context, NO_TRANSACTION_PTR, reserved_space, table_lock_holder);

View File

@ -39,6 +39,10 @@ void MutatePlainMergeTreeTask::prepare()
future_part,
task_context);
storage.writePartLog(
PartLogElement::MUTATE_PART_START, {}, 0,
future_part->name, new_part, future_part->parts, merge_list_entry.get(), {});
stopwatch = std::make_unique<Stopwatch>();
write_part_log = [this] (const ExecutionStatus & execution_status)