diff --git a/src/Storages/MergeTree/ActiveDataPartSet.cpp b/src/Storages/MergeTree/ActiveDataPartSet.cpp index ba95da37b29..2cb98b707a3 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -283,6 +283,11 @@ std::vector ActiveDataPartSet::getPartInfos() const return res; } +const std::map & ActiveDataPartSet::getPartNamesWithInfos() const +{ + return part_info_to_name; +} + size_t ActiveDataPartSet::size() const { return part_info_to_name.size(); diff --git a/src/Storages/MergeTree/ActiveDataPartSet.h b/src/Storages/MergeTree/ActiveDataPartSet.h index ca744b3ed2a..8af7693e748 100644 --- a/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/src/Storages/MergeTree/ActiveDataPartSet.h @@ -96,6 +96,7 @@ public: /// Returns parts in ascending order of the partition_id and block number. Strings getParts() const; std::vector getPartInfos() const; + const std::map & getPartNamesWithInfos() const; size_t size() const; diff --git a/src/Storages/examples/merge_selector3.cpp b/src/Storages/examples/merge_selector3.cpp index 801c30e3414..56620fcc2d7 100644 --- a/src/Storages/examples/merge_selector3.cpp +++ b/src/Storages/examples/merge_selector3.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -21,9 +22,86 @@ enum TaskType struct ReplicaState { - IMergeSelector::PartsRange parts_without_currently_merging_parts; - size_t total_parts_count; - std::vector parts_names_cache; + ActiveDataPartSet parts_without_currently_merging_parts{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING}; + std::unordered_map::iterator, IMergeSelector::Part>> parts_data; + std::list names_holder; + + std::list parts_names_cache; + + uint64_t getTotalPartsCount() const + { + return parts_without_currently_merging_parts.size(); + } + + void tickAge() + { + for (auto & [_, data] : parts_data) + data.second.age++; + } + + void addPart(const std::string & part_name, IMergeSelector::Part & part_data) + { + //std::cerr << "Add part: " << part_name << std::endl; + Strings replaced_parts; + parts_without_currently_merging_parts.add(part_name, &replaced_parts); + for (const auto & replaced_part : replaced_parts) + { + //auto list_it = parts_data[replaced_part].first; + //names_holder.erase(list_it); + parts_data.erase(replaced_part); + } + + names_holder.push_back(part_name); + auto it = names_holder.end(); + parts_data[names_holder.back()] = std::make_pair(std::prev(it), part_data); + } + + IMergeSelector::PartsRanges getPartRangesForMerge(const std::unordered_set & currently_merging_parts, const ActiveDataPartSet & shared_state) + { + IMergeSelector::PartsRanges parts_ranges; + const MergeTreePartInfo * prev_part = nullptr; + const auto & parts = parts_without_currently_merging_parts.getPartNamesWithInfos(); + for (const auto & [part_info, part_name] : parts) + { + if (currently_merging_parts.contains(part_name)) + continue; + + auto containing_part = shared_state.getContainingPart(part_info); + if (!containing_part.empty() && containing_part != part_name) + { + continue; + } + + if (!prev_part) + { + if (parts_ranges.empty() || !parts_ranges.back().empty()) + parts_ranges.emplace_back(); + } + else + { + if (part_info.min_block != prev_part->max_block + 1) + { + //std::cerr << "New range because left: " << prev_part->getPartNameV1() << " | right: " << part_info.getPartNameV1() << std::endl; + prev_part = nullptr; + parts_ranges.emplace_back(); + } + } + + + //std::cerr << "Parts_data has: " << part_name << std::endl; + auto it = parts_data.find(part_name); + if (it == parts_data.end()) + std::terminate(); + auto [list_it, part] = it->second; + part.data = reinterpret_cast(list_it->data()); + parts_ranges.back().emplace_back(part); + + prev_part = &part_info; + } + + return parts_ranges; + } + }; using AllReplicasState = std::unordered_map; @@ -32,8 +110,9 @@ using CurrentlyMergingPartsNames = std::unordered_set; struct SharedState { + ActiveDataPartSet shared_state{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING}; + std::unordered_set merging_parts; AllReplicasState all_replicas; - CurrentlyMergingPartsNames merging_parts; size_t currently_running_merges{0}; uint64_t total_parts{0}; }; @@ -55,7 +134,7 @@ public: std::string getReplicaName() const { return replica_name; } virtual ~ITask() = default; virtual void updatePartsStateOnTaskStart(SharedState & state) = 0; - virtual void updatePartsStateOnTaskFinish(SharedState & state) = 0; + virtual std::vector> updatePartsStateOnTaskFinish(SharedState & state, uint64_t) = 0; virtual bool isFinished(uint64_t current_time) const = 0; virtual TaskType getTaskType() const = 0; }; @@ -64,11 +143,13 @@ class FetchTask final : public ITask { IMergeSelector::Part part; uint64_t fetch_time; + std::string part_name; public: - FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & replica_name_) + FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & part_name_, const std::string & replica_name_) : ITask(replica_name_) , part(part_) , fetch_time(current_time) + , part_name(part_name_) {} uint64_t getFinishTime() const override @@ -84,43 +165,13 @@ public: void updatePartsStateOnTaskStart(SharedState & state) override { auto & replica_state = state.all_replicas[replica_name]; - auto & replica_parts = replica_state.parts_without_currently_merging_parts; - MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(reinterpret_cast(part.data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); - std::optional begin; - std::optional end; - for (size_t i = 0; i < replica_parts.size(); ++i) - { - MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(reinterpret_cast(replica_parts[i].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); - if (new_part_info.contains(part_info)) - { - if (!begin) - begin.emplace(i); - - end = i; - } - else if (begin && end) - { - break; - } - } - - if (begin && end && *begin != *end) - { - replica_parts[*begin] = part; - replica_state.parts_names_cache.push_back(reinterpret_cast(part.data)); - replica_parts[*begin].data = replica_state.parts_names_cache.back().data(); - replica_parts.erase(replica_parts.begin() + *begin + 1, replica_parts.begin() + *end); - replica_state.total_parts_count += 1 - (*end - *begin); - } - else - { - replica_state.total_parts_count += 1; - } - + //std::cerr << "Fetching part: " << part_name << " for replica " << replica_name << std::endl; + replica_state.addPart(part_name, part); } - void updatePartsStateOnTaskFinish(SharedState &) override + std::vector> updatePartsStateOnTaskFinish(SharedState &, uint64_t) override { + return {}; } TaskType getTaskType() const override @@ -155,13 +206,15 @@ public: void updatePartsStateOnTaskStart(SharedState & state) override { - state.all_replicas[replica_name].parts_without_currently_merging_parts.push_back(part); - state.all_replicas[replica_name].total_parts_count += 1; + //std::cerr << "Inserting part: " << reinterpret_cast(part.data) << " for replica " << replica_name << std::endl; + state.all_replicas[replica_name].addPart(reinterpret_cast(part.data), part); + state.shared_state.add(reinterpret_cast(part.data)); state.total_parts += 1; } - void updatePartsStateOnTaskFinish(SharedState &) override + std::vector> updatePartsStateOnTaskFinish(SharedState &, uint64_t) override { + return {}; } TaskType getTaskType() const override @@ -196,6 +249,8 @@ public: merge_size_rows += part.rows; merge_size_bytes += part.size; max_level = std::max(part.level, max_level); + + //std::cerr << "Starting merge range: " << reinterpret_cast(part.data) << std::endl; } duration = merge_size_rows / merge_speed * 1000; @@ -215,28 +270,23 @@ public: state.currently_running_merges++; } - void updatePartsStateOnTaskFinish(SharedState & state) override + std::vector> updatePartsStateOnTaskFinish(SharedState & state, uint64_t current_time) override { for (const auto & part_to_merge : parts_to_merge) + { + //std::cerr << "Finishing merge range: " << reinterpret_cast(part_to_merge.data) << std::endl; state.merging_parts.erase(reinterpret_cast(part_to_merge.data)); + } auto & replica_state = state.all_replicas[replica_name]; - auto & replica_parts = replica_state.parts_without_currently_merging_parts; - uint64_t start_index = 0; - - for (uint64_t i = 0, size = replica_parts.size(); i < size; ++i) - { - if (replica_parts[i].data == parts_to_merge.front().data) - { - start_index = i; - break; - } - } - replica_parts[start_index].size = merge_size_bytes; - replica_parts[start_index].level = max_level + 1; - replica_parts[start_index].age = 0; - replica_parts[start_index].rows = merge_size_rows; + IMergeSelector::Part merged_part; + merged_part.size = merge_size_bytes; + merged_part.level = max_level + 1; + merged_part.age = 0; + merged_part.rows = merge_size_rows; + //std::cerr << "Merge first part: " << reinterpret_cast(parts_to_merge[0].data) << std::endl; + //std::cerr << "Merge last part: " << reinterpret_cast(parts_to_merge.back().data) << std::endl; MergeTreePartInfo first_info = MergeTreePartInfo::fromPartName(reinterpret_cast(parts_to_merge[0].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); MergeTreePartInfo last_info = MergeTreePartInfo::fromPartName(reinterpret_cast(parts_to_merge.back().data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); @@ -244,13 +294,19 @@ public: final_part_info.min_block = first_info.min_block; final_part_info.max_block = last_info.max_block; final_part_info.level = max_level + 1; - replica_state.parts_names_cache.push_back(final_part_info.getPartNameV1()); - replica_parts[start_index].data = replica_state.parts_names_cache.data(); - replica_parts.erase(replica_parts.begin() + start_index + 1, replica_parts.begin() + start_index + parts_to_merge.size()); - replica_state.total_parts_count += (1 - parts_to_merge.size()); + + //std::cerr << "Merged part: " << final_part_info.getPartNameV1() << " for replica " << replica_name << std::endl; + replica_state.addPart(final_part_info.getPartNameV1(), merged_part); + state.shared_state.add(final_part_info); + state.total_parts += (1 - parts_to_merge.size()); state.currently_running_merges--; + + std::vector> tasks; + for (const auto & [replica_name, _] : state.all_replicas) + tasks.push_back(std::make_unique(merged_part, current_time + 5, final_part_info.getPartNameV1(), replica_name)); + return tasks; } TaskType getTaskType() const override @@ -319,7 +375,7 @@ public: } else if (parts[i].task == FETCH) { - tasks.push(std::make_unique(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name)); + tasks.push(std::make_unique(parts[i].part, insertion_times[i] - start_time, reinterpret_cast(parts[i].part.data), parts[i].replica_name)); } else { @@ -347,12 +403,9 @@ public: { if (current_time % 1000 == 0) { - std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << std::endl; + std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << " time passed " << current_time / 1000 << std::endl; for (auto & replica : state.all_replicas) - { - for (auto & part : replica.second.parts_without_currently_merging_parts) - part.age++; - } + replica.second.tickAge(); } while (!tasks.empty()) @@ -362,7 +415,9 @@ public: { if (top_task->getTaskType() == MERGE) { - top_task->updatePartsStateOnTaskFinish(state); + auto new_tasks = top_task->updatePartsStateOnTaskFinish(state, current_time); + for (auto && new_task : new_tasks) + tasks.push(std::move(new_task)); } else if (top_task->getTaskType() == INSERT) { @@ -386,20 +441,18 @@ public: } } - if (current_time % 67 == 0) + //std::cerr << "Will try to assign merge\n"; + if (current_time % 100 == 0) { for (const auto & replica_name : replicas) { - auto parts_for_replica = state.all_replicas[replica_name].parts_without_currently_merging_parts; - for (auto itr = parts_for_replica.begin(); itr != parts_for_replica.end();) - { - if (state.merging_parts.contains(static_cast(itr->data))) - itr = parts_for_replica.erase(itr); - else - ++itr; - } + //std::cerr << "Trying for replica: " << replica_name << " merging parts: " << state.merging_parts.size() << std::endl; + auto ranges = state.all_replicas[replica_name].getPartRangesForMerge(state.merging_parts, state.shared_state); - IMergeSelector::PartsRange selected_parts = selector.select({parts_for_replica}, getMaxSizeToMerge()); + if (ranges.empty()) + continue; + + IMergeSelector::PartsRange selected_parts = selector.select(ranges, getMaxSizeToMerge()); if (!selected_parts.empty()) { @@ -436,6 +489,7 @@ int main(int, char **) uint64_t start_time = std::numeric_limits::max(); std::vector insertion_times; PartsWithTypeAndReplicas parts; + size_t counter = 0; while (!in.eof()) { part_names.emplace_back(); @@ -449,6 +503,9 @@ int main(int, char **) parts.emplace_back(PartWithActionTypeAndReplica{part, replica_name, static_cast(event_type)}); start_time = std::min(start_time, event_time); insertion_times.push_back(event_time); + counter++; + if (counter % 1000 == 0) + std::cerr << "Loaded:" << counter << " events\n"; } std::cerr << "Parsed: \n";