From 0852d2d655e194efafb4de9ca3393119496f7401 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 30 Oct 2024 18:22:18 +0100 Subject: [PATCH] very bad simulator with replicas --- src/Storages/examples/CMakeLists.txt | 3 + src/Storages/examples/merge_selector3.cpp | 235 ++++++++++++++++++---- 2 files changed, 198 insertions(+), 40 deletions(-) diff --git a/src/Storages/examples/CMakeLists.txt b/src/Storages/examples/CMakeLists.txt index b4786b7313b..6421d4e601a 100644 --- a/src/Storages/examples/CMakeLists.txt +++ b/src/Storages/examples/CMakeLists.txt @@ -4,5 +4,8 @@ target_link_libraries (merge_selector PRIVATE dbms) clickhouse_add_executable (merge_selector2 merge_selector2.cpp) target_link_libraries (merge_selector2 PRIVATE dbms) +clickhouse_add_executable (merge_selector3 merge_selector3.cpp) +target_link_libraries (merge_selector3 PRIVATE dbms) + clickhouse_add_executable (get_current_inserts_in_replicated get_current_inserts_in_replicated.cpp) target_link_libraries (get_current_inserts_in_replicated PRIVATE dbms clickhouse_common_config clickhouse_common_zookeeper) diff --git a/src/Storages/examples/merge_selector3.cpp b/src/Storages/examples/merge_selector3.cpp index 9d2676563ef..dd14e3b095d 100644 --- a/src/Storages/examples/merge_selector3.cpp +++ b/src/Storages/examples/merge_selector3.cpp @@ -1,7 +1,10 @@ #include #include +#include #include #include +#include +#include #include #include #include @@ -13,24 +16,111 @@ enum TaskType { INSERT = 0, MERGE = 1, + FETCH = 2, +}; + +struct ReplicaState +{ + IMergeSelector::PartsRange parts_without_currently_merging_parts; + size_t total_parts_count; + std::vector parts_names_cache; +}; + +using AllReplicasState = std::unordered_map; + +using CurrentlyMergingPartsNames = std::unordered_set; + +struct SharedState +{ + AllReplicasState all_replicas; + CurrentlyMergingPartsNames merging_parts; + size_t currently_running_merges{0}; }; class ITask { +protected: + std::string replica_name; public: + explicit ITask(const std::string & replica_name_) + : replica_name(replica_name_) + {} virtual uint64_t getFinishTime() const = 0; bool operator<(const ITask & o) const { return getFinishTime() < o.getFinishTime(); } + std::string getReplicaName() const { return replica_name; } virtual ~ITask() = default; - virtual void updatePartsState(IMergeSelector::PartsRange & parts) = 0; - virtual int64_t getTotalPartsUpdate() const = 0; + virtual void updatePartsStateOnTaskStart(SharedState & state) = 0; + virtual void updatePartsStateOnTaskFinish(SharedState & state) = 0; virtual bool isFinished(uint64_t current_time) const = 0; virtual TaskType getTaskType() const = 0; }; +class FetchTask final : public ITask +{ + IMergeSelector::Part part; + uint64_t fetch_time; +public: + FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & replica_name_) + : ITask(replica_name_) + , part(part_) + , fetch_time(current_time) + {} + + uint64_t getFinishTime() const override + { + return fetch_time; + } + + bool isFinished(uint64_t current_time) const override + { + return fetch_time <= current_time; + } + + void updatePartsStateOnTaskStart(SharedState & state) override + { + auto & replica_state = state.all_replicas[replica_name]; + replica_state.total_parts_count += 1; + auto & replica_parts = replica_state.parts_without_currently_merging_parts; + MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(reinterpret_cast(part.data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); + std::optional begin; + std::optional end; + for (size_t i = 0; i < replica_parts.size(); ++i) + { + MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(reinterpret_cast(replica_parts[i].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); + if (new_part_info.contains(part_info)) + { + if (!begin) + begin.emplace(i); + + end = i; + } + else if (begin && end) + { + break; + } + } + + if (begin && end && *begin != *end) + { + replica_parts[*begin] = part; + replica_parts.erase(replica_parts.begin() + *begin + 1, replica_parts.begin() + *end); + } + } + + void updatePartsStateOnTaskFinish(SharedState &) override + { + } + + TaskType getTaskType() const override + { + return FETCH; + } +}; + class InsertTask final : public ITask { @@ -42,8 +132,10 @@ public: IMergeSelector::Part part; uint64_t insert_time; - InsertTask(const IMergeSelector::Part & part_, uint64_t insert_time_) - : part(part_) + + InsertTask(const IMergeSelector::Part & part_, uint64_t insert_time_, const std::string & replica_name_) + : ITask(replica_name_) + , part(part_) , insert_time(insert_time_) { } @@ -53,14 +145,14 @@ public: return insert_time <= current_time; } - void updatePartsState(IMergeSelector::PartsRange & parts) override + void updatePartsStateOnTaskStart(SharedState & state) override { - parts.push_back(part); + state.all_replicas[replica_name].parts_without_currently_merging_parts.push_back(part); + state.all_replicas[replica_name].total_parts_count += 1; } - int64_t getTotalPartsUpdate() const override + void updatePartsStateOnTaskFinish(SharedState &) override { - return +1; } TaskType getTaskType() const override @@ -71,11 +163,13 @@ public: class MergeTask final : public ITask { + IMergeSelector::Part part_after_merge; public: uint64_t getFinishTime() const override { return start_time + duration; } + std::vector parts_to_merge; uint64_t merge_size_bytes{0}; uint64_t merge_size_rows{0}; @@ -83,8 +177,9 @@ public: uint64_t duration{0}; uint32_t max_level{0}; - MergeTask(const std::vector & parts_to_merge_, uint64_t current_time, uint64_t merge_speed) - : parts_to_merge(parts_to_merge_) + MergeTask(const std::vector & parts_to_merge_, uint64_t current_time, uint64_t merge_speed, const std::string & replica_name_) + : ITask(replica_name_) + , parts_to_merge(parts_to_merge_) , start_time(current_time) { for (const auto & part : parts_to_merge) @@ -103,28 +198,46 @@ public: return (current_time - start_time) >= duration; } - void updatePartsState(IMergeSelector::PartsRange & parts) override + void updatePartsStateOnTaskStart(SharedState & state) override { + for (const auto & part_to_merge : parts_to_merge) + state.merging_parts.insert(reinterpret_cast(part_to_merge.data)); + } + + void updatePartsStateOnTaskFinish(SharedState & state) override + { + for (const auto & part_to_merge : parts_to_merge) + state.merging_parts.erase(reinterpret_cast(part_to_merge.data)); + + auto & replica_state = state.all_replicas[replica_name]; + auto & replica_parts = replica_state.parts_without_currently_merging_parts; + uint64_t start_index = 0; - for (uint64_t i = 0, size = parts.size(); i < size; ++i) + for (uint64_t i = 0, size = replica_parts.size(); i < size; ++i) { - if (parts[i].data == parts_to_merge.front().data) + if (replica_parts[i].data == parts_to_merge.front().data) { start_index = i; break; } } - parts[start_index].size = merge_size_bytes; - parts[start_index].level = max_level + 1; - parts[start_index].age = 0; - parts[start_index].rows = merge_size_rows; - parts.erase(parts.begin() + start_index + 1, parts.begin() + start_index + parts_to_merge.size()); - } + replica_parts[start_index].size = merge_size_bytes; + replica_parts[start_index].level = max_level + 1; + replica_parts[start_index].age = 0; + replica_parts[start_index].rows = merge_size_rows; + MergeTreePartInfo first_info = MergeTreePartInfo::fromPartName(reinterpret_cast(parts_to_merge[0].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); + MergeTreePartInfo last_info = MergeTreePartInfo::fromPartName(reinterpret_cast(parts_to_merge.back().data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING); - int64_t getTotalPartsUpdate() const override - { - return 1 - parts_to_merge.size(); + MergeTreePartInfo final_part_info = first_info; + final_part_info.min_block = first_info.min_block; + final_part_info.max_block = last_info.max_block; + final_part_info.level = max_level + 1; + replica_state.parts_names_cache.push_back(final_part_info.getPartNameV1()); + replica_parts[start_index].data = replica_state.parts_names_cache.data(); + replica_parts.erase(replica_parts.begin() + start_index + 1, replica_parts.begin() + start_index + parts_to_merge.size()); + + replica_state.total_parts_count += (1 - parts_to_merge.size()); } TaskType getTaskType() const override @@ -142,6 +255,15 @@ struct Comparator } }; +struct PartWithActionTypeAndReplica +{ + IMergeSelector::Part part; + std::string replica_name; + TaskType task; +}; + +using PartsWithTypeAndReplicas = std::vector; + class Simulator { private: @@ -155,12 +277,15 @@ private: uint64_t merge_speed; uint64_t too_many_parts; uint64_t currently_running_merges{0}; + SharedState state; IMergeSelector::PartsRanges partitions; IMergeSelector::PartsRange & parts_state; + std::set replicas; + SimpleMergeSelector selector; uint64_t total_parts{0}; public: - Simulator(const IMergeSelector::PartsRange & inserted_parts, + Simulator(const PartsWithTypeAndReplicas & parts, std::vector insertion_times, uint64_t start_time, SimpleMergeSelector::Settings settings_, @@ -177,10 +302,23 @@ public: , parts_state(partitions.back()) , selector(settings) { - for (uint64_t i = 0; i < inserted_parts.size(); ++i) + for (uint64_t i = 0; i < parts.size(); ++i) { + replicas.insert(parts[i].replica_name); + + if (parts[i].task == INSERT) + { + tasks.push(std::make_unique(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name)); + } + else if (parts[i].task == FETCH) + { + tasks.push(std::make_unique(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name)); + } + else + { + std::terminate(); + } //std::cerr << "Insert time: " << insertion_times[i] - start_time << " start time:" << start_time << std::endl; - tasks.push(std::make_unique(inserted_parts[i], insertion_times[i] - start_time)); } } @@ -214,20 +352,22 @@ public: { if (top_task->getTaskType() == MERGE) { - --currently_running_merges; + top_task->updatePartsStateOnTaskFinish(state); } - - if (top_task->getTaskType() == INSERT) + else if (top_task->getTaskType() == INSERT) { if (total_parts < too_many_parts) { - top_task->updatePartsState(parts_state); + top_task->updatePartsStateOnTaskStart(state); } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Too many parts"); } + else if (top_task->getTaskType() == FETCH) + { + top_task->updatePartsStateOnTaskStart(state); + } - total_parts += top_task->getTotalPartsUpdate(); tasks.pop(); } else @@ -238,15 +378,28 @@ public: if (current_time % 67 == 0) { - IMergeSelector::PartsRange selected_parts = selector.select(partitions, getMaxSizeToMerge()); - - if (!selected_parts.empty()) + for (const auto & replica_name : replicas) { - auto new_merge = std::make_unique(selected_parts, current_time, merge_speed); - new_merge->updatePartsState(parts_state); - tasks.push(std::move(new_merge)); - currently_running_merges++; + auto parts_for_replica = state.all_replicas[replica_name].parts_without_currently_merging_parts; + for (auto itr = parts_for_replica.begin(); itr != parts_for_replica.end();) + { + if (state.merging_parts.contains(static_cast(itr->data))) + itr = parts_for_replica.erase(itr); + else + ++itr; + } + + IMergeSelector::PartsRange selected_parts = selector.select({parts_for_replica}, getMaxSizeToMerge()); + + if (!selected_parts.empty()) + { + auto new_merge = std::make_unique(selected_parts, current_time, merge_speed, replica_name); + new_merge->updatePartsStateOnTaskStart(state); + tasks.push(std::move(new_merge)); + currently_running_merges++; + } } + } current_time++; @@ -271,18 +424,20 @@ int main(int, char **) std::list part_names; - IMergeSelector::PartsRange parts; uint64_t start_time = std::numeric_limits::max(); std::vector insertion_times; + PartsWithTypeAndReplicas parts; while (!in.eof()) { part_names.emplace_back(); IMergeSelector::Part part; uint64_t event_time; - in >> part.size >> "\t" >> part.rows >> "\t" >> part.level >> "\t" >> part_names.back() >> "\t" >> event_time >> "\n"; + std::string replica_name; + uint64_t event_type; + in >> part.size >> "\t" >> part.rows >> "\t" >> part.level >> "\t" >> part_names.back() >> "\t" >> event_time >> "\t" >> event_type >> "\t" >> replica_name >> "\n"; part.data = part_names.back().data(); part.age = 0; - parts.emplace_back(part); + parts.emplace_back(PartWithActionTypeAndReplica{part, replica_name, static_cast(event_type)}); start_time = std::min(start_time, event_time); insertion_times.push_back(event_time); }