diff --git a/dbms/src/Storages/tests/MergeLogicTester.cpp b/dbms/src/Storages/tests/MergeLogicTester.cpp index e486d406f9b..0adf8b15ca7 100644 --- a/dbms/src/Storages/tests/MergeLogicTester.cpp +++ b/dbms/src/Storages/tests/MergeLogicTester.cpp @@ -37,11 +37,11 @@ const int RowsPerSec = 55000; StorageMergeTreeSettings settings; -/// Чему он равен ? -int index_granularity = 0; + +int index_granularity = 1; /// Time, Type, Value -set > > events; +multiset > > events; /// Текущие части в merge tree DataParts data_parts; @@ -52,7 +52,10 @@ int uniqId = 1; /// Разные статистики long long totalMergeTime = 0, totalSize = 0; DataParts maxCount, maxMerging, maxThreads; -int maxCountMoment, maxMergingMoment, maxThreadsMoment; +int maxCountMoment, maxMergingMoment, maxThreadsMoment, maxScheduledThreadsMoment; +int maxScheduledThreads; + +int mergeScheduled = 0; int genRand(int l, int r) { @@ -78,6 +81,14 @@ bool selectPartsToMerge(std::vector & parts) /// Нужно для определения максимальности по включению. int max_count_from_left = 0; + + /// NOTE + /// Сейчас всегда true, поскольку в настоящем mergeTree этой эвристики нет + bool is_anything_merging = true; + for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) + if ((*it)->currently_merging) + is_anything_merging = true; + /// Левый конец отрезка. for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) { @@ -86,8 +97,8 @@ bool selectPartsToMerge(std::vector & parts) max_count_from_left = std::max(0, max_count_from_left - 1); /// Кусок не занят и достаточно мал. - if (first_part->currently_merging || - first_part->size * index_granularity > settings.max_rows_to_merge_parts) + if (first_part->currently_merging || (is_anything_merging && + first_part->size * index_granularity > settings.max_rows_to_merge_parts)) continue; /// Самый длинный валидный отрезок, начинающийся здесь. @@ -108,8 +119,8 @@ bool selectPartsToMerge(std::vector & parts) const DataPtr & last_part = *jt; /// Кусок не занят, достаточно мал и в одном правильном месяце. - if (last_part->currently_merging || - last_part->size * index_granularity > settings.max_rows_to_merge_parts) + if (last_part->currently_merging || (is_anything_merging && + last_part->size * index_granularity > settings.max_rows_to_merge_parts)) break; cur_max = std::max(cur_max, last_part->size); @@ -167,67 +178,6 @@ bool selectPartsToMerge(std::vector & parts) return found; } -/// выбрать кого мерджить, оценить время и добавить событие об окончании -void makeMerge(int curTime) { - std::vector e; - if (!selectPartsToMerge(e)) return; - int curId = uniqId ++; - size_t size = 0; - for (size_t i = 0; i < e.size(); ++i) - { - e[i]->currently_merging = curId; - size += e[i]->size; - } - size_t needTime = (size + RowsPerSec - 1) / RowsPerSec; - totalMergeTime += needTime; - events.insert(make_pair(curTime + needTime, make_pair(2, curId))); -} - -/// Запустить потоки мерджа -void merge(int curTime, int cnt) -{ - for (int i = 0; i < cnt; ++i) - makeMerge(curTime); -} - -/// Обработать событие -void process(pair > ev) -{ - int curTime = ev.first; - int type = ev.second.first; - int val = ev.second.second; - - /// insert - if (type == 1) - { - data_parts.insert(new DataPart(curTime, val)); - merge(curTime, 2); - totalSize += val; - return; - } - - /// merge done - if (type == 2) - { - size_t size = 0; - int st = (int)1e9; - DataParts newData; - for (DataParts::iterator it = data_parts.begin(); it != data_parts.end();) - if ((*it)->currently_merging == val) - { - size += (*it)->size; - st = min(st, (*it)->time); - DataParts::iterator nxt = it; - nxt ++; - data_parts.erase(it); - it = nxt; - } else - it ++; - data_parts.insert(new DataPart(st, size)); - return; - } -} - int getMergeSize(const DataParts &a) { int res = 0; @@ -270,34 +220,111 @@ void updateStat(int time) maxThreads = copy(data_parts); maxThreadsMoment = time; } + if (maxScheduledThreads < mergeScheduled) + { + maxScheduledThreads = mergeScheduled; + maxScheduledThreadsMoment = time; + } } + +/// выбрать кого мерджить, оценить время и добавить событие об окончании +bool makeMerge(int cur_time) { + if (getThreads(data_parts) >= settings.merging_threads) return 0; + if (mergeScheduled == 0) return 0; + mergeScheduled --; + std::vector e; + if (!selectPartsToMerge(e)) return 0; + int curId = uniqId ++; + size_t size = 0; + for (size_t i = 0; i < e.size(); ++i) + { + e[i]->currently_merging = curId; + size += e[i]->size; + } + size_t need_time = (size + RowsPerSec - 1) / RowsPerSec; + totalMergeTime += need_time; + events.insert(make_pair(cur_time + need_time, make_pair(2, curId))); + return 1; +} + +/// Запустить потоки мерджа +void merge(int cur_time, int cnt) +{ + mergeScheduled += cnt; +} + +/// Обработать событие +void process(pair > ev) +{ + int cur_time = ev.first; + int type = ev.second.first; + int val = ev.second.second; + + /// insert + if (type == 1) + { + data_parts.insert(new DataPart(cur_time, val)); + totalSize += val; + merge(cur_time, 2); + } else if (type == 2) /// merge done + { + size_t size = 0; + int st = (int)1e9; + DataParts newData; + for (DataParts::iterator it = data_parts.begin(); it != data_parts.end();) + { + if ((*it)->currently_merging == val) + { + size += (*it)->size; + st = min(st, (*it)->time); + DataParts::iterator nxt = it; + nxt ++; + data_parts.erase(it); + it = nxt; + } else + it ++; + } + data_parts.insert(new DataPart(st, size)); + } else if (type == 3) /// do merge + { + merge(cur_time, val); + } + + while (makeMerge(cur_time)); +} + + int main() { srand(rdtsc()); for (int i = 0; i < 10000; ++i) { - if (rand() & 15) - events.insert(make_pair(i * 10, make_pair(1, genRand(65000, 75000)))); + int delay = 30; + if (rand() & 7) + events.insert(make_pair(i * delay, make_pair(1, genRand(65000, 75000)))); else { - events.insert(make_pair(2 + i * 10, make_pair(1, genRand(1000, 20000)))); - events.insert(make_pair(5 + i * 10, make_pair(1, genRand(1000, 20000)))); - events.insert(make_pair(8 + i * 10, make_pair(1, genRand(1000, 20000)))); + events.insert(make_pair(-4 + i * delay, make_pair(1, genRand(1000, 20000)))); + events.insert(make_pair(-2 + i * delay, make_pair(1, genRand(1000, 20000)))); + events.insert(make_pair(0 + i * delay, make_pair(1, genRand(20000, 30000)))); + events.insert(make_pair(+2 + i * delay, make_pair(1, genRand(1000, 20000)))); + events.insert(make_pair(+4 + i * delay, make_pair(1, genRand(1000, 20000)))); } } + int iter = 0; - int curTime = 0; + int cur_time = 0; maxCount = data_parts; puts("________________________________________________________________________________________________________"); puts("A couple of moments from the process log:"); while (events.size() > 0) { - curTime = events.begin()->first; - updateStat(curTime); + cur_time = events.begin()->first; + updateStat(cur_time); iter ++; if (iter % 3000 == 0) { - printf("Current time: %d\n", curTime); + printf("Current time: %d\n", cur_time); printf("Current parts:"); writeParts(data_parts); } @@ -310,10 +337,11 @@ int main() writeParts(maxCount); printf("Max total size of merging parts was at %d second with %d rows in merge\n", maxMergingMoment, getMergeSize(maxMerging)); writeParts(maxMerging); - printf("Max number of active threads was at %d second with %d threads\n", maxThreadsMoment, getThreads(maxThreads)); + printf("Max number of running threads was at %d second with %d threads\n", maxThreadsMoment, getThreads(maxThreads)); writeParts(maxThreads); + printf("Max number of scheduled threads was at %d second with %d threads\n", maxScheduledThreadsMoment, maxScheduledThreads); printf("Total merge time %lld sec\n", totalMergeTime); - printf("Total time %d sec\n", curTime); + printf("Total time %d sec\n", cur_time); printf("Total parts size %lld\n", totalSize); printf("Total merged Rows / total rows %0.5lf \n", 1.0 * totalMergeTime * RowsPerSec / totalSize); puts("________________________________________________________________________________________________________");