This commit is contained in:
Sergey Fedorov 2013-11-27 16:41:48 +00:00
parent b61eadf4db
commit 5c95506548

View File

@ -37,11 +37,11 @@ const int RowsPerSec = 55000;
StorageMergeTreeSettings settings;
/// Чему он равен ?
int index_granularity = 0;
int index_granularity = 1;
/// Time, Type, Value
set<pair<int, pair<int, int> > > events;
multiset<pair<int, pair<int, int> > > events;
/// Текущие части в merge tree
DataParts data_parts;
@ -52,7 +52,10 @@ int uniqId = 1;
/// Разные статистики
long long totalMergeTime = 0, totalSize = 0;
DataParts maxCount, maxMerging, maxThreads;
int maxCountMoment, maxMergingMoment, maxThreadsMoment;
int maxCountMoment, maxMergingMoment, maxThreadsMoment, maxScheduledThreadsMoment;
int maxScheduledThreads;
int mergeScheduled = 0;
int genRand(int l, int r)
{
@ -78,6 +81,14 @@ bool selectPartsToMerge(std::vector<DataPtr> & parts)
/// Нужно для определения максимальности по включению.
int max_count_from_left = 0;
/// NOTE
/// Сейчас всегда true, поскольку в настоящем mergeTree этой эвристики нет
bool is_anything_merging = true;
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
if ((*it)->currently_merging)
is_anything_merging = true;
/// Левый конец отрезка.
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
{
@ -86,8 +97,8 @@ bool selectPartsToMerge(std::vector<DataPtr> & parts)
max_count_from_left = std::max(0, max_count_from_left - 1);
/// Кусок не занят и достаточно мал.
if (first_part->currently_merging ||
first_part->size * index_granularity > settings.max_rows_to_merge_parts)
if (first_part->currently_merging || (is_anything_merging &&
first_part->size * index_granularity > settings.max_rows_to_merge_parts))
continue;
/// Самый длинный валидный отрезок, начинающийся здесь.
@ -108,8 +119,8 @@ bool selectPartsToMerge(std::vector<DataPtr> & parts)
const DataPtr & last_part = *jt;
/// Кусок не занят, достаточно мал и в одном правильном месяце.
if (last_part->currently_merging ||
last_part->size * index_granularity > settings.max_rows_to_merge_parts)
if (last_part->currently_merging || (is_anything_merging &&
last_part->size * index_granularity > settings.max_rows_to_merge_parts))
break;
cur_max = std::max(cur_max, last_part->size);
@ -167,67 +178,6 @@ bool selectPartsToMerge(std::vector<DataPtr> & parts)
return found;
}
/// выбрать кого мерджить, оценить время и добавить событие об окончании
void makeMerge(int curTime) {
std::vector<DataPtr> e;
if (!selectPartsToMerge(e)) return;
int curId = uniqId ++;
size_t size = 0;
for (size_t i = 0; i < e.size(); ++i)
{
e[i]->currently_merging = curId;
size += e[i]->size;
}
size_t needTime = (size + RowsPerSec - 1) / RowsPerSec;
totalMergeTime += needTime;
events.insert(make_pair(curTime + needTime, make_pair(2, curId)));
}
/// Запустить потоки мерджа
void merge(int curTime, int cnt)
{
for (int i = 0; i < cnt; ++i)
makeMerge(curTime);
}
/// Обработать событие
void process(pair<int, pair<int, int> > ev)
{
int curTime = ev.first;
int type = ev.second.first;
int val = ev.second.second;
/// insert
if (type == 1)
{
data_parts.insert(new DataPart(curTime, val));
merge(curTime, 2);
totalSize += val;
return;
}
/// merge done
if (type == 2)
{
size_t size = 0;
int st = (int)1e9;
DataParts newData;
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end();)
if ((*it)->currently_merging == val)
{
size += (*it)->size;
st = min(st, (*it)->time);
DataParts::iterator nxt = it;
nxt ++;
data_parts.erase(it);
it = nxt;
} else
it ++;
data_parts.insert(new DataPart(st, size));
return;
}
}
int getMergeSize(const DataParts &a)
{
int res = 0;
@ -270,34 +220,111 @@ void updateStat(int time)
maxThreads = copy(data_parts);
maxThreadsMoment = time;
}
if (maxScheduledThreads < mergeScheduled)
{
maxScheduledThreads = mergeScheduled;
maxScheduledThreadsMoment = time;
}
}
/// выбрать кого мерджить, оценить время и добавить событие об окончании
bool makeMerge(int cur_time) {
if (getThreads(data_parts) >= settings.merging_threads) return 0;
if (mergeScheduled == 0) return 0;
mergeScheduled --;
std::vector<DataPtr> e;
if (!selectPartsToMerge(e)) return 0;
int curId = uniqId ++;
size_t size = 0;
for (size_t i = 0; i < e.size(); ++i)
{
e[i]->currently_merging = curId;
size += e[i]->size;
}
size_t need_time = (size + RowsPerSec - 1) / RowsPerSec;
totalMergeTime += need_time;
events.insert(make_pair(cur_time + need_time, make_pair(2, curId)));
return 1;
}
/// Запустить потоки мерджа
void merge(int cur_time, int cnt)
{
mergeScheduled += cnt;
}
/// Обработать событие
void process(pair<int, pair<int, int> > ev)
{
int cur_time = ev.first;
int type = ev.second.first;
int val = ev.second.second;
/// insert
if (type == 1)
{
data_parts.insert(new DataPart(cur_time, val));
totalSize += val;
merge(cur_time, 2);
} else if (type == 2) /// merge done
{
size_t size = 0;
int st = (int)1e9;
DataParts newData;
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end();)
{
if ((*it)->currently_merging == val)
{
size += (*it)->size;
st = min(st, (*it)->time);
DataParts::iterator nxt = it;
nxt ++;
data_parts.erase(it);
it = nxt;
} else
it ++;
}
data_parts.insert(new DataPart(st, size));
} else if (type == 3) /// do merge
{
merge(cur_time, val);
}
while (makeMerge(cur_time));
}
int main()
{
srand(rdtsc());
for (int i = 0; i < 10000; ++i)
{
if (rand() & 15)
events.insert(make_pair(i * 10, make_pair(1, genRand(65000, 75000))));
int delay = 30;
if (rand() & 7)
events.insert(make_pair(i * delay, make_pair(1, genRand(65000, 75000))));
else {
events.insert(make_pair(2 + i * 10, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(5 + i * 10, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(8 + i * 10, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(-4 + i * delay, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(-2 + i * delay, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(0 + i * delay, make_pair(1, genRand(20000, 30000))));
events.insert(make_pair(+2 + i * delay, make_pair(1, genRand(1000, 20000))));
events.insert(make_pair(+4 + i * delay, make_pair(1, genRand(1000, 20000))));
}
}
int iter = 0;
int curTime = 0;
int cur_time = 0;
maxCount = data_parts;
puts("________________________________________________________________________________________________________");
puts("A couple of moments from the process log:");
while (events.size() > 0)
{
curTime = events.begin()->first;
updateStat(curTime);
cur_time = events.begin()->first;
updateStat(cur_time);
iter ++;
if (iter % 3000 == 0)
{
printf("Current time: %d\n", curTime);
printf("Current time: %d\n", cur_time);
printf("Current parts:");
writeParts(data_parts);
}
@ -310,10 +337,11 @@ int main()
writeParts(maxCount);
printf("Max total size of merging parts was at %d second with %d rows in merge\n", maxMergingMoment, getMergeSize(maxMerging));
writeParts(maxMerging);
printf("Max number of active threads was at %d second with %d threads\n", maxThreadsMoment, getThreads(maxThreads));
printf("Max number of running threads was at %d second with %d threads\n", maxThreadsMoment, getThreads(maxThreads));
writeParts(maxThreads);
printf("Max number of scheduled threads was at %d second with %d threads\n", maxScheduledThreadsMoment, maxScheduledThreads);
printf("Total merge time %lld sec\n", totalMergeTime);
printf("Total time %d sec\n", curTime);
printf("Total time %d sec\n", cur_time);
printf("Total parts size %lld\n", totalSize);
printf("Total merged Rows / total rows %0.5lf \n", 1.0 * totalMergeTime * RowsPerSec / totalSize);
puts("________________________________________________________________________________________________________");