very bad simulator with replicas

This commit is contained in:
alesapin 2024-10-30 18:22:18 +01:00
parent 3b08f898f3
commit 0852d2d655
2 changed files with 198 additions and 40 deletions

View File

@ -4,5 +4,8 @@ target_link_libraries (merge_selector PRIVATE dbms)
clickhouse_add_executable (merge_selector2 merge_selector2.cpp)
target_link_libraries (merge_selector2 PRIVATE dbms)
clickhouse_add_executable (merge_selector3 merge_selector3.cpp)
target_link_libraries (merge_selector3 PRIVATE dbms)
clickhouse_add_executable (get_current_inserts_in_replicated get_current_inserts_in_replicated.cpp)
target_link_libraries (get_current_inserts_in_replicated PRIVATE dbms clickhouse_common_config clickhouse_common_zookeeper)

View File

@ -1,7 +1,10 @@
#include <list>
#include <iostream>
#include <unordered_map>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
#include <Common/formatReadable.h>
#include <base/interpolate.h>
@ -13,24 +16,111 @@ enum TaskType
{
INSERT = 0,
MERGE = 1,
FETCH = 2,
};
struct ReplicaState
{
IMergeSelector::PartsRange parts_without_currently_merging_parts;
size_t total_parts_count;
std::vector<std::string> parts_names_cache;
};
using AllReplicasState = std::unordered_map<std::string, ReplicaState>;
using CurrentlyMergingPartsNames = std::unordered_set<std::string>;
struct SharedState
{
AllReplicasState all_replicas;
CurrentlyMergingPartsNames merging_parts;
size_t currently_running_merges{0};
};
class ITask
{
protected:
std::string replica_name;
public:
explicit ITask(const std::string & replica_name_)
: replica_name(replica_name_)
{}
virtual uint64_t getFinishTime() const = 0;
bool operator<(const ITask & o) const
{
return getFinishTime() < o.getFinishTime();
}
std::string getReplicaName() const { return replica_name; }
virtual ~ITask() = default;
virtual void updatePartsState(IMergeSelector::PartsRange & parts) = 0;
virtual int64_t getTotalPartsUpdate() const = 0;
virtual void updatePartsStateOnTaskStart(SharedState & state) = 0;
virtual void updatePartsStateOnTaskFinish(SharedState & state) = 0;
virtual bool isFinished(uint64_t current_time) const = 0;
virtual TaskType getTaskType() const = 0;
};
class FetchTask final : public ITask
{
IMergeSelector::Part part;
uint64_t fetch_time;
public:
FetchTask(const IMergeSelector::Part & part_, uint64_t current_time, const std::string & replica_name_)
: ITask(replica_name_)
, part(part_)
, fetch_time(current_time)
{}
uint64_t getFinishTime() const override
{
return fetch_time;
}
bool isFinished(uint64_t current_time) const override
{
return fetch_time <= current_time;
}
void updatePartsStateOnTaskStart(SharedState & state) override
{
auto & replica_state = state.all_replicas[replica_name];
replica_state.total_parts_count += 1;
auto & replica_parts = replica_state.parts_without_currently_merging_parts;
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(part.data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
std::optional<size_t> begin;
std::optional<size_t> end;
for (size_t i = 0; i < replica_parts.size(); ++i)
{
MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(replica_parts[i].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
if (new_part_info.contains(part_info))
{
if (!begin)
begin.emplace(i);
end = i;
}
else if (begin && end)
{
break;
}
}
if (begin && end && *begin != *end)
{
replica_parts[*begin] = part;
replica_parts.erase(replica_parts.begin() + *begin + 1, replica_parts.begin() + *end);
}
}
void updatePartsStateOnTaskFinish(SharedState &) override
{
}
TaskType getTaskType() const override
{
return FETCH;
}
};
class InsertTask final : public ITask
{
@ -42,8 +132,10 @@ public:
IMergeSelector::Part part;
uint64_t insert_time;
InsertTask(const IMergeSelector::Part & part_, uint64_t insert_time_)
: part(part_)
InsertTask(const IMergeSelector::Part & part_, uint64_t insert_time_, const std::string & replica_name_)
: ITask(replica_name_)
, part(part_)
, insert_time(insert_time_)
{
}
@ -53,14 +145,14 @@ public:
return insert_time <= current_time;
}
void updatePartsState(IMergeSelector::PartsRange & parts) override
void updatePartsStateOnTaskStart(SharedState & state) override
{
parts.push_back(part);
state.all_replicas[replica_name].parts_without_currently_merging_parts.push_back(part);
state.all_replicas[replica_name].total_parts_count += 1;
}
int64_t getTotalPartsUpdate() const override
void updatePartsStateOnTaskFinish(SharedState &) override
{
return +1;
}
TaskType getTaskType() const override
@ -71,11 +163,13 @@ public:
class MergeTask final : public ITask
{
IMergeSelector::Part part_after_merge;
public:
uint64_t getFinishTime() const override
{
return start_time + duration;
}
std::vector<IMergeSelector::Part> parts_to_merge;
uint64_t merge_size_bytes{0};
uint64_t merge_size_rows{0};
@ -83,8 +177,9 @@ public:
uint64_t duration{0};
uint32_t max_level{0};
MergeTask(const std::vector<IMergeSelector::Part> & parts_to_merge_, uint64_t current_time, uint64_t merge_speed)
: parts_to_merge(parts_to_merge_)
MergeTask(const std::vector<IMergeSelector::Part> & parts_to_merge_, uint64_t current_time, uint64_t merge_speed, const std::string & replica_name_)
: ITask(replica_name_)
, parts_to_merge(parts_to_merge_)
, start_time(current_time)
{
for (const auto & part : parts_to_merge)
@ -103,28 +198,46 @@ public:
return (current_time - start_time) >= duration;
}
void updatePartsState(IMergeSelector::PartsRange & parts) override
void updatePartsStateOnTaskStart(SharedState & state) override
{
for (const auto & part_to_merge : parts_to_merge)
state.merging_parts.insert(reinterpret_cast<const char *>(part_to_merge.data));
}
void updatePartsStateOnTaskFinish(SharedState & state) override
{
for (const auto & part_to_merge : parts_to_merge)
state.merging_parts.erase(reinterpret_cast<const char *>(part_to_merge.data));
auto & replica_state = state.all_replicas[replica_name];
auto & replica_parts = replica_state.parts_without_currently_merging_parts;
uint64_t start_index = 0;
for (uint64_t i = 0, size = parts.size(); i < size; ++i)
for (uint64_t i = 0, size = replica_parts.size(); i < size; ++i)
{
if (parts[i].data == parts_to_merge.front().data)
if (replica_parts[i].data == parts_to_merge.front().data)
{
start_index = i;
break;
}
}
parts[start_index].size = merge_size_bytes;
parts[start_index].level = max_level + 1;
parts[start_index].age = 0;
parts[start_index].rows = merge_size_rows;
parts.erase(parts.begin() + start_index + 1, parts.begin() + start_index + parts_to_merge.size());
}
replica_parts[start_index].size = merge_size_bytes;
replica_parts[start_index].level = max_level + 1;
replica_parts[start_index].age = 0;
replica_parts[start_index].rows = merge_size_rows;
MergeTreePartInfo first_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(parts_to_merge[0].data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
MergeTreePartInfo last_info = MergeTreePartInfo::fromPartName(reinterpret_cast<const char *>(parts_to_merge.back().data), MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
int64_t getTotalPartsUpdate() const override
{
return 1 - parts_to_merge.size();
MergeTreePartInfo final_part_info = first_info;
final_part_info.min_block = first_info.min_block;
final_part_info.max_block = last_info.max_block;
final_part_info.level = max_level + 1;
replica_state.parts_names_cache.push_back(final_part_info.getPartNameV1());
replica_parts[start_index].data = replica_state.parts_names_cache.data();
replica_parts.erase(replica_parts.begin() + start_index + 1, replica_parts.begin() + start_index + parts_to_merge.size());
replica_state.total_parts_count += (1 - parts_to_merge.size());
}
TaskType getTaskType() const override
@ -142,6 +255,15 @@ struct Comparator
}
};
struct PartWithActionTypeAndReplica
{
IMergeSelector::Part part;
std::string replica_name;
TaskType task;
};
using PartsWithTypeAndReplicas = std::vector<PartWithActionTypeAndReplica>;
class Simulator
{
private:
@ -155,12 +277,15 @@ private:
uint64_t merge_speed;
uint64_t too_many_parts;
uint64_t currently_running_merges{0};
SharedState state;
IMergeSelector::PartsRanges partitions;
IMergeSelector::PartsRange & parts_state;
std::set<std::string> replicas;
SimpleMergeSelector selector;
uint64_t total_parts{0};
public:
Simulator(const IMergeSelector::PartsRange & inserted_parts,
Simulator(const PartsWithTypeAndReplicas & parts,
std::vector<uint64_t> insertion_times,
uint64_t start_time,
SimpleMergeSelector::Settings settings_,
@ -177,10 +302,23 @@ public:
, parts_state(partitions.back())
, selector(settings)
{
for (uint64_t i = 0; i < inserted_parts.size(); ++i)
for (uint64_t i = 0; i < parts.size(); ++i)
{
replicas.insert(parts[i].replica_name);
if (parts[i].task == INSERT)
{
tasks.push(std::make_unique<InsertTask>(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name));
}
else if (parts[i].task == FETCH)
{
tasks.push(std::make_unique<FetchTask>(parts[i].part, insertion_times[i] - start_time, parts[i].replica_name));
}
else
{
std::terminate();
}
//std::cerr << "Insert time: " << insertion_times[i] - start_time << " start time:" << start_time << std::endl;
tasks.push(std::make_unique<InsertTask>(inserted_parts[i], insertion_times[i] - start_time));
}
}
@ -214,20 +352,22 @@ public:
{
if (top_task->getTaskType() == MERGE)
{
--currently_running_merges;
top_task->updatePartsStateOnTaskFinish(state);
}
if (top_task->getTaskType() == INSERT)
else if (top_task->getTaskType() == INSERT)
{
if (total_parts < too_many_parts)
{
top_task->updatePartsState(parts_state);
top_task->updatePartsStateOnTaskStart(state);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Too many parts");
}
else if (top_task->getTaskType() == FETCH)
{
top_task->updatePartsStateOnTaskStart(state);
}
total_parts += top_task->getTotalPartsUpdate();
tasks.pop();
}
else
@ -238,15 +378,28 @@ public:
if (current_time % 67 == 0)
{
IMergeSelector::PartsRange selected_parts = selector.select(partitions, getMaxSizeToMerge());
if (!selected_parts.empty())
for (const auto & replica_name : replicas)
{
auto new_merge = std::make_unique<MergeTask>(selected_parts, current_time, merge_speed);
new_merge->updatePartsState(parts_state);
tasks.push(std::move(new_merge));
currently_running_merges++;
auto parts_for_replica = state.all_replicas[replica_name].parts_without_currently_merging_parts;
for (auto itr = parts_for_replica.begin(); itr != parts_for_replica.end();)
{
if (state.merging_parts.contains(static_cast<const char *>(itr->data)))
itr = parts_for_replica.erase(itr);
else
++itr;
}
IMergeSelector::PartsRange selected_parts = selector.select({parts_for_replica}, getMaxSizeToMerge());
if (!selected_parts.empty())
{
auto new_merge = std::make_unique<MergeTask>(selected_parts, current_time, merge_speed, replica_name);
new_merge->updatePartsStateOnTaskStart(state);
tasks.push(std::move(new_merge));
currently_running_merges++;
}
}
}
current_time++;
@ -271,18 +424,20 @@ int main(int, char **)
std::list<std::string> part_names;
IMergeSelector::PartsRange parts;
uint64_t start_time = std::numeric_limits<uint64_t>::max();
std::vector<uint64_t> insertion_times;
PartsWithTypeAndReplicas parts;
while (!in.eof())
{
part_names.emplace_back();
IMergeSelector::Part part;
uint64_t event_time;
in >> part.size >> "\t" >> part.rows >> "\t" >> part.level >> "\t" >> part_names.back() >> "\t" >> event_time >> "\n";
std::string replica_name;
uint64_t event_type;
in >> part.size >> "\t" >> part.rows >> "\t" >> part.level >> "\t" >> part_names.back() >> "\t" >> event_time >> "\t" >> event_type >> "\t" >> replica_name >> "\n";
part.data = part_names.back().data();
part.age = 0;
parts.emplace_back(part);
parts.emplace_back(PartWithActionTypeAndReplica{part, replica_name, static_cast<TaskType>(event_type)});
start_time = std::min(start_time, event_time);
insertion_times.push_back(event_time);
}