diff --git a/src/Storages/examples/merge_selector3.cpp b/src/Storages/examples/merge_selector3.cpp index 56620fcc2d7..dc9f62631ed 100644 --- a/src/Storages/examples/merge_selector3.cpp +++ b/src/Storages/examples/merge_selector3.cpp @@ -9,6 +9,7 @@ #include #include #include +#include using namespace DB; @@ -56,15 +57,24 @@ struct ReplicaState parts_data[names_holder.back()] = std::make_pair(std::prev(it), part_data); } - IMergeSelector::PartsRanges getPartRangesForMerge(const std::unordered_set & currently_merging_parts, const ActiveDataPartSet & shared_state) + IMergeSelector::PartsRanges getPartRangesForMerge(const absl::flat_hash_set & currently_merging_parts, const ActiveDataPartSet & shared_state) { IMergeSelector::PartsRanges parts_ranges; const MergeTreePartInfo * prev_part = nullptr; const auto & parts = parts_without_currently_merging_parts.getPartNamesWithInfos(); + std::cerr << "Currently merging parts: " << currently_merging_parts.size() << std::endl; + std::cerr << "Total parts: " << parts.size() << std::endl; for (const auto & [part_info, part_name] : parts) { if (currently_merging_parts.contains(part_name)) + { + std::cerr << "Skip\n"; continue; + } + else + { + std::cerr << "No Skip\n"; + } auto containing_part = shared_state.getContainingPart(part_info); if (!containing_part.empty() && containing_part != part_name) @@ -111,7 +121,7 @@ using CurrentlyMergingPartsNames = std::unordered_set; struct SharedState { ActiveDataPartSet shared_state{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING}; - std::unordered_set merging_parts; + absl::flat_hash_set merging_parts; AllReplicasState all_replicas; size_t currently_running_merges{0}; uint64_t total_parts{0}; @@ -404,8 +414,8 @@ public: if (current_time % 1000 == 0) { std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << " time passed " << current_time / 1000 << std::endl; - for (auto & replica : state.all_replicas) - replica.second.tickAge(); + //for (auto & replica : state.all_replicas) + // replica.second.tickAge(); } while (!tasks.empty()) @@ -442,7 +452,7 @@ public: } //std::cerr << "Will try to assign merge\n"; - if (current_time % 100 == 0) + if (current_time % 30 == 0) { for (const auto & replica_name : replicas) { @@ -450,7 +460,10 @@ public: auto ranges = state.all_replicas[replica_name].getPartRangesForMerge(state.merging_parts, state.shared_state); if (ranges.empty()) + { + std::cerr << "No ranges\n"; continue; + } IMergeSelector::PartsRange selected_parts = selector.select(ranges, getMaxSizeToMerge());