Add logs to queue because I cannot understand anything without them

This commit is contained in:
alesapin 2021-09-14 12:03:48 +03:00
parent c54434084f
commit 9441378b39

View File

@ -170,7 +170,11 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock)
{
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
auto entry_virtual_parts = entry->getVirtualPartNames(format_version);
LOG_TEST(log, "Insert entry {} to queue with type {} with virtual parts [{}]", entry->znode_name, entry->typeToString(), fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
@ -234,6 +238,11 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> & state_lock)
{
auto entry_virtual_parts = entry->getVirtualPartNames(format_version);
LOG_TEST(log, "Removing {} entry {} from queue with type {} with virtual parts [{}]",
is_successful ? "successful" : "unsuccessful",
entry->znode_name, entry->typeToString(), fmt::join(entry_virtual_parts, ", "));
/// Update insert times.
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
@ -261,6 +270,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
if (!entry->actual_new_part_name.empty())
{
LOG_TEST(log, "Entry {} has actual new part name {}, removing it from mutations", entry->znode_name, entry->actual_new_part_name);
/// We don't add bigger fetched part to current_parts because we
/// have an invariant `virtual_parts` = `current_parts` + `queue`.
///
@ -271,7 +281,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
LOG_TEST(log, "Adding parts [{}] to current parts", fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
current_parts.add(virtual_part_name, nullptr);
@ -282,14 +294,21 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
{
LOG_TEST(log, "Removing drop part from current and virtual parts {}", *drop_range_part_name);
current_parts.removePartAndCoveredParts(*drop_range_part_name);
}
else
{
LOG_TEST(log, "Removing drop range from current and virtual parts {}", *drop_range_part_name);
current_parts.remove(*drop_range_part_name);
}
virtual_parts.remove(*drop_range_part_name);
@ -314,7 +333,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
drop_ranges.removeDropRange(entry);
}
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
LOG_TEST(log, "Removing unsuccessful entry {} virtual parts [{}]", entry->znode_name, fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
/// This part will never appear, so remove it from virtual parts
virtual_parts.remove(virtual_part_name);
@ -331,6 +352,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & part_name, bool remove_part, bool remove_covered_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
LOG_TEST(log, "Removing part {} from mutations (remove_part: {}, remove_covered_parts: {})", part_name, remove_part, remove_covered_parts);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return;
@ -368,11 +392,17 @@ void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & pa
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
{
LOG_TEST(log, "Adding part {} to mutations", part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
/// Do not add special virtual parts to parts_to_do
if (part_info.isFakeDropRangePart())
{
LOG_TEST(log, "Part {} is fake drop range part, will not add it to mutations", part_name);
return;
}
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())