Real but slow simulator

This commit is contained in:
alesapin 2024-11-01 19:14:30 +01:00
parent 0f0f053e3d
commit f77146d24c
3 changed files with 143 additions and 80 deletions

View File

@ -283,6 +283,11 @@ std::vector<MergeTreePartInfo> ActiveDataPartSet::getPartInfos() const
return res;
}
const std::map<MergeTreePartInfo, std::string> & ActiveDataPartSet::getPartNamesWithInfos() const
{
return part_info_to_name;
}
size_t ActiveDataPartSet::size() const
{
return part_info_to_name.size();

View File

@ -96,6 +96,7 @@ public:
/// Returns parts in ascending order of the partition_id and block number.
Strings getParts() const;
std::vector<MergeTreePartInfo> getPartInfos() const;
const std::map<MergeTreePartInfo, std::string> & getPartNamesWithInfos() const;
size_t size() const;

View File

@ -6,6 +6,7 @@
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Common/formatReadable.h>
#include <base/interpolate.h>
@ -21,9 +22,86 @@ enum TaskType
struct ReplicaState
{
IMergeSelector::PartsRange parts_without_currently_merging_parts;
size_t total_parts_count;
std::vector<std::string> parts_names_cache;
ActiveDataPartSet parts_without_currently_merging_parts{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING};
std::unordered_map<std::string_view, std::pair<std::list<std::string>::iterator, IMergeSelector::Part>> parts_data;
std::list<std::string> names_holder;
std::list<std::string> parts_names_cache;
uint64_t getTotalPartsCount() const
{
return parts_without_currently_merging_parts.size();
}
void tickAge()
{
for (auto & [_, data] : parts_data)
data.second.age++;
}
void addPart(const std::string & part_name, IMergeSelector::Part & part_data)
{
//std::cerr << "Add part: " << part_name << std::endl;
Strings replaced_parts;
parts_without_currently_merging_parts.add(part_name, &replaced_parts);
for (const auto & replaced_part : replaced_parts)
{
//auto list_it = parts_data[replaced_part].first;
//names_holder.erase(list_it);
parts_data.erase(replaced_part);
}
names_holder.push_back(part_name);
auto it = names_holder.end();
parts_data[names_holder.back()] = std::make_pair(std::prev(it), part_data);
}
IMergeSelector::PartsRanges getPartRangesForMerge(const std::unordered_set<std::string> & currently_merging_parts, const ActiveDataPartSet & shared_state)
{
IMergeSelector::PartsRanges parts_ranges;
const MergeTreePartInfo * prev_part = nullptr;
const auto & parts = parts_without_currently_merging_parts.getPartNamesWithInfos();
for (const auto & [part_info, part_name] : parts)
{
if (currently_merging_parts.contains(part_name))
continue;
auto containing_part = shared_state.getContainingPart(part_info);
if (!containing_part.empty() && containing_part != part_name)
{
continue;
}
if (!prev_part)
{
if (parts_ranges.empty() || !parts_ranges.back().empty())
parts_ranges.emplace_back();
}
else
{
if (part_info.min_block != prev_part->max_block + 1)
{
//std::cerr << "New range because left: " << prev_part->getPartNameV1() << " | right: " << part_info.getPartNameV1() << std::endl;
prev_part = nullptr;
parts_ranges.emplace_back();
}
}
//std::cerr << "Parts_data has: " << part_name << std::endl;
auto it = parts_data.find(part_name);
if (it == parts_data.end())
std::terminate();
auto [list_it, part] = it->second;
part.data = reinterpret_cast<const void *>(list_it->data());
parts_ranges.back().emplace_back(part);
prev_part = &part_info;
}
return parts_ranges;
}
};
using AllReplicasState = std::unordered_map<std::string, ReplicaState>;
@ -32,8 +110,9 @@ using CurrentlyMergingPartsNames = std::unordered_set<std::string>;
struct SharedState
{
ActiveDataPartSet shared_state{MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING};
std::unordered_set<std::string> merging_parts;
AllReplicasState all_replicas;
CurrentlyMergingPartsNames merging_parts;
size_t currently_running_merges{0};
uint64_t total_parts{0};
};
@ -55,7 +134,7 @@ public:
std::string getReplicaName() const { return replica_name; }
virtual ~ITask() = default;
virtual void updatePartsStateOnTaskStart(SharedState & state) = 0;
virtual void updatePartsStateOnTaskFinish(SharedState & state) = 0;
virtual std::vector<std::unique_ptr<ITask>> updatePartsStateOnTaskFinish(SharedState & state, uint64_t) = 0;
virtual bool isFinished(uint64_t current_time) const = 0;
virtual TaskType getTaskType() const = 0;
};
@ -64,11 +143,13 @@ class FetchTask final : public ITask
{
IMergeSelector::Part part;
uint64_t fetch_time;
std::string part_name;
public:
FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & replica_name_)
FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & part_name_, const std::string & replica_name_)
: ITask(replica_name_)
, part(part_)
, fetch_time(current_time)
, part_name(part_name_)
{}
uint64_t getFinishTime() const override
@ -84,43 +165,13 @@ public:
void updatePartsStateOnTaskStart(SharedState & state) override
{
auto & replica_state = state.all_replicas[replica_name];
auto & replica_parts = replica_state.parts_without_currently_merging_parts;
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(part.data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
std::optional<size_t> begin;
std::optional<size_t> end;
for (size_t i = 0; i < replica_parts.size(); ++i)
{
MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(replica_parts[i].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
if (new_part_info.contains(part_info))
{
if (!begin)
begin.emplace(i);
end = i;
}
else if (begin && end)
{
break;
}
}
if (begin && end && *begin != *end)
{
replica_parts[*begin] = part;
replica_state.parts_names_cache.push_back(reinterpret_cast<const char *>(part.data));
replica_parts[*begin].data = replica_state.parts_names_cache.back().data();
replica_parts.erase(replica_parts.begin() + *begin + 1, replica_parts.begin() + *end);
replica_state.total_parts_count += 1 - (*end - *begin);
}
else
{
replica_state.total_parts_count += 1;
}
//std::cerr << "Fetching part: " << part_name << " for replica " << replica_name << std::endl;
replica_state.addPart(part_name, part);
}
void updatePartsStateOnTaskFinish(SharedState &) override
std::vector<std::unique_ptr<ITask>> updatePartsStateOnTaskFinish(SharedState &, uint64_t) override
{
return {};
}
TaskType getTaskType() const override
@ -155,13 +206,15 @@ public:
void updatePartsStateOnTaskStart(SharedState & state) override
{
state.all_replicas[replica_name].parts_without_currently_merging_parts.push_back(part);
state.all_replicas[replica_name].total_parts_count += 1;
//std::cerr << "Inserting part: " << reinterpret_cast<const char *>(part.data) << " for replica " << replica_name << std::endl;
state.all_replicas[replica_name].addPart(reinterpret_cast<const char *>(part.data), part);
state.shared_state.add(reinterpret_cast<const char *>(part.data));
state.total_parts += 1;
}
void updatePartsStateOnTaskFinish(SharedState &) override
std::vector<std::unique_ptr<ITask>> updatePartsStateOnTaskFinish(SharedState &, uint64_t) override
{
return {};
}
TaskType getTaskType() const override
@ -196,6 +249,8 @@ public:
merge_size_rows += part.rows;
merge_size_bytes += part.size;
max_level = std::max<uint32_t>(part.level, max_level);
//std::cerr << "Starting merge range: " << reinterpret_cast<const char *>(part.data) << std::endl;
}
duration = merge_size_rows / merge_speed * 1000;
@ -215,28 +270,23 @@ public:
state.currently_running_merges++;
}
void updatePartsStateOnTaskFinish(SharedState & state) override
std::vector<std::unique_ptr<ITask>> updatePartsStateOnTaskFinish(SharedState & state, uint64_t current_time) override
{
for (const auto & part_to_merge : parts_to_merge)
{
//std::cerr << "Finishing merge range: " << reinterpret_cast<const char *>(part_to_merge.data) << std::endl;
state.merging_parts.erase(reinterpret_cast<const char *>(part_to_merge.data));
}
auto & replica_state = state.all_replicas[replica_name];
auto & replica_parts = replica_state.parts_without_currently_merging_parts;
uint64_t start_index = 0;
for (uint64_t i = 0, size = replica_parts.size(); i < size; ++i)
{
if (replica_parts[i].data == parts_to_merge.front().data)
{
start_index = i;
break;
}
}
replica_parts[start_index].size = merge_size_bytes;
replica_parts[start_index].level = max_level + 1;
replica_parts[start_index].age = 0;
replica_parts[start_index].rows = merge_size_rows;
IMergeSelector::Part merged_part;
merged_part.size = merge_size_bytes;
merged_part.level = max_level + 1;
merged_part.age = 0;
merged_part.rows = merge_size_rows;
//std::cerr << "Merge first part: " << reinterpret_cast<const char *>(parts_to_merge[0].data) << std::endl;
//std::cerr << "Merge last part: " << reinterpret_cast<const char *>(parts_to_merge.back().data) << std::endl;
MergeTreePartInfo first_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(parts_to_merge[0].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
MergeTreePartInfo last_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(parts_to_merge.back().data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
@ -244,13 +294,19 @@ public:
final_part_info.min_block = first_info.min_block;
final_part_info.max_block = last_info.max_block;
final_part_info.level = max_level + 1;
replica_state.parts_names_cache.push_back(final_part_info.getPartNameV1());
replica_parts[start_index].data = replica_state.parts_names_cache.data();
replica_parts.erase(replica_parts.begin() + start_index + 1, replica_parts.begin() + start_index + parts_to_merge.size());
replica_state.total_parts_count += (1 - parts_to_merge.size());
//std::cerr << "Merged part: " << final_part_info.getPartNameV1() << " for replica " << replica_name << std::endl;
replica_state.addPart(final_part_info.getPartNameV1(), merged_part);
state.shared_state.add(final_part_info);
state.total_parts += (1 - parts_to_merge.size());
state.currently_running_merges--;
std::vector<std::unique_ptr<ITask>> tasks;
for (const auto & [replica_name, _] : state.all_replicas)
tasks.push_back(std::make_unique<FetchTask>(merged_part, current_time + 5, final_part_info.getPartNameV1(), replica_name));
return tasks;
}
TaskType getTaskType() const override
@ -319,7 +375,7 @@ public:
}
else if (parts[i].task == FETCH)
{
tasks.push(std::make_unique<FetchTask>(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name));
tasks.push(std::make_unique<FetchTask>(parts[i].part, insertion_times[i] - start_time, reinterpret_cast<const char *>(parts[i].part.data), parts[i].replica_name));
}
else
{
@ -347,12 +403,9 @@ public:
{
if (current_time % 1000 == 0)
{
std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << std::endl;
std::cerr << "Total parts count: " << state.total_parts << " merges running:" << state.currently_running_merges << " time passed " << current_time / 1000 << std::endl;
for (auto & replica : state.all_replicas)
{
for (auto & part : replica.second.parts_without_currently_merging_parts)
part.age++;
}
replica.second.tickAge();
}
while (!tasks.empty())
@ -362,7 +415,9 @@ public:
{
if (top_task->getTaskType() == MERGE)
{
top_task->updatePartsStateOnTaskFinish(state);
auto new_tasks = top_task->updatePartsStateOnTaskFinish(state, current_time);
for (auto && new_task : new_tasks)
tasks.push(std::move(new_task));
}
else if (top_task->getTaskType() == INSERT)
{
@ -386,20 +441,18 @@ public:
}
}
if (current_time % 67 == 0)
//std::cerr << "Will try to assign merge\n";
if (current_time % 100 == 0)
{
for (const auto & replica_name : replicas)
{
auto parts_for_replica = state.all_replicas[replica_name].parts_without_currently_merging_parts;
for (auto itr = parts_for_replica.begin(); itr != parts_for_replica.end();)
{
if (state.merging_parts.contains(static_cast<const char *>(itr->data)))
itr = parts_for_replica.erase(itr);
else
++itr;
}
//std::cerr << "Trying for replica: " << replica_name << " merging parts: " << state.merging_parts.size() << std::endl;
auto ranges = state.all_replicas[replica_name].getPartRangesForMerge(state.merging_parts, state.shared_state);
IMergeSelector::PartsRange selected_parts = selector.select({parts_for_replica}, getMaxSizeToMerge());
if (ranges.empty())
continue;
IMergeSelector::PartsRange selected_parts = selector.select(ranges, getMaxSizeToMerge());
if (!selected_parts.empty())
{
@ -436,6 +489,7 @@ int main(int, char **)
uint64_t start_time = std::numeric_limits<uint64_t>::max();
std::vector<uint64_t> insertion_times;
PartsWithTypeAndReplicas parts;
size_t counter = 0;
while (!in.eof())
{
part_names.emplace_back();
@ -449,6 +503,9 @@ int main(int, char **)
parts.emplace_back(PartWithActionTypeAndReplica{part, replica_name, static_cast<TaskType>(event_type)});
start_time = std::min(start_time, event_time);
insertion_times.push_back(event_time);
counter++;
if (counter % 1000 == 0)
std::cerr << "Loaded:" << counter << " events\n";
}
std::cerr << "Parsed: \n";