Something

This commit is contained in:
alesapin 2024-11-04 12:31:28 +01:00
parent f77146d24c
commit 326ac3ebf5

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/ActiveDataPartSet.h> #include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <base/interpolate.h> #include <base/interpolate.h>
#include <absl/container/flat_hash_set.h>
using namespace DB; using namespace DB;
@ -56,15 +57,24 @@ struct ReplicaState
parts_data[names_holder.back()] = std::make_pair(std::prev(it), part_data); parts_data[names_holder.back()] = std::make_pair(std::prev(it), part_data);
} }
IMergeSelector::PartsRanges getPartRangesForMerge(const std::unordered_set<std::string> & currently_merging_parts, const ActiveDataPartSet & shared_state) IMergeSelector::PartsRanges getPartRangesForMerge(const absl::flat_hash_set<std::string> & currently_merging_parts, const ActiveDataPartSet & shared_state)
{ {
IMergeSelector::PartsRanges parts_ranges; IMergeSelector::PartsRanges parts_ranges;
const MergeTreePartInfo * prev_part = nullptr; const MergeTreePartInfo * prev_part = nullptr;
const auto & parts = parts_without_currently_merging_parts.getPartNamesWithInfos(); const auto & parts = parts_without_currently_merging_parts.getPartNamesWithInfos();
std::cerr << "Currently merging parts: " << currently_merging_parts.size() << std::endl;
std::cerr << "Total parts: " << parts.size() << std::endl;
for (const auto & [part_info, part_name] : parts) for (const auto & [part_info, part_name] : parts)
{ {
if (currently_merging_parts.contains(part_name)) if (currently_merging_parts.contains(part_name))
{
std::cerr << "Skip\n";
continue; continue;
}
else
{
std::cerr << "No Skip\n";
}
auto containing_part = shared_state.getContainingPart(part_info); auto containing_part = shared_state.getContainingPart(part_info);
if (!containing_part.empty() && containing_part != part_name) if (!containing_part.empty() && containing_part != part_name)
@ -111,7 +121,7 @@ using CurrentlyMergingPartsNames = std::unordered_set<std::string>;
struct SharedState struct SharedState
{ {
ActiveDataPartSet shared_state{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING}; ActiveDataPartSet shared_state{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING};
std::unordered_set<std::string> merging_parts; absl::flat_hash_set<std::string> merging_parts;
AllReplicasState all_replicas; AllReplicasState all_replicas;
size_t currently_running_merges{0}; size_t currently_running_merges{0};
uint64_t total_parts{0}; uint64_t total_parts{0};
@ -404,8 +414,8 @@ public:
if (current_time % 1000 == 0) if (current_time % 1000 == 0)
{ {
std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << " time passed " << current_time / 1000 << std::endl; std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << " time passed " << current_time / 1000 << std::endl;
for (auto & replica : state.all_replicas) //for (auto & replica : state.all_replicas)
replica.second.tickAge(); // replica.second.tickAge();
} }
while (!tasks.empty()) while (!tasks.empty())
@ -442,7 +452,7 @@ public:
} }
//std::cerr << "Will try to assign merge\n"; //std::cerr << "Will try to assign merge\n";
if (current_time % 100 == 0) if (current_time % 30 == 0)
{ {
for (const auto & replica_name : replicas) for (const auto & replica_name : replicas)
{ {
@ -450,7 +460,10 @@ public:
auto ranges = state.all_replicas[replica_name].getPartRangesForMerge(state.merging_parts, state.shared_state); auto ranges = state.all_replicas[replica_name].getPartRangesForMerge(state.merging_parts, state.shared_state);
if (ranges.empty()) if (ranges.empty())
{
std::cerr << "No ranges\n";
continue; continue;
}
IMergeSelector::PartsRange selected_parts = selector.select(ranges, getMaxSizeToMerge()); IMergeSelector::PartsRange selected_parts = selector.select(ranges, getMaxSizeToMerge());