This commit is contained in:
Sergey Fedorov 2013-12-13 16:20:06 +00:00
parent 9ba96ea811
commit 6fb0caf597
3 changed files with 34 additions and 40 deletions

View File

@ -219,6 +219,7 @@ namespace ErrorCodes
NETWORK_ERROR,
EMPTY_QUERY,
UNKNOWN_LOAD_BALANCING,
CANNOT_STATVFS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -281,15 +281,6 @@ private:
return res;
}
size_t getSizeInBytes() const
{
size_t res = size_in_bytes;
/// Если еще не посчитан результат
if (res == 0)
res = calcTotalSize(storage.full_path + name + "/");
return res;
}
void remove() const
{
String from = storage.full_path + name + "/";
@ -347,7 +338,7 @@ private:
&& right >= rhs.right;
}
/// Загрузить индекс.
/// Загрузить индекс и вычислить размер.
void loadIndex()
{
size_t key_size = storage.sort_descr.size();
@ -389,13 +380,17 @@ private:
class CurrentlyMergingPartsTagger
{
public:
std::vector<DataPartPtr> parts;
Poco::FastMutex &data_mutex;
CurrentlyMergingPartsTagger(const std::vector<DataPartPtr> & parts_, Poco::FastMutex &data_mutex_) : parts(parts_), data_mutex(data_mutex_)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри selectPartsToMerge, где он уже залочен
/// Poco::ScopedLock<Poco::FastMutex> lock(data_mutex);
for (size_t i = 0; i < parts.size(); ++i)
{
parts[i]->currently_merging = true;
StorageMergeTree::total_size_of_currently_merging_parts += parts[i]->getSizeInBytes();
StorageMergeTree::total_size_of_currently_merging_parts += parts[i]->size_in_bytes;
}
}
~CurrentlyMergingPartsTagger()
@ -404,12 +399,9 @@ private:
for (size_t i = 0; i < parts.size(); ++i)
{
parts[i]->currently_merging = false;
StorageMergeTree::total_size_of_currently_merging_parts -= parts[i]->getSizeInBytes();
StorageMergeTree::total_size_of_currently_merging_parts -= parts[i]->size_in_bytes;
}
}
private:
std::vector<DataPartPtr> parts;
Poco::FastMutex &data_mutex;
};
/// Сумарный размер currently_merging кусочков в байтах.
@ -474,8 +466,8 @@ private:
void mergeThread(bool while_can);
/// Сразу помечает их как currently_merging.
/// Если merge_anything_for_old_months, для кусков за прошедшие месяцы снимается ограничение на соотношение размеров.
bool selectPartsToMerge(std::vector<DataPartPtr> & parts, bool merge_anything_for_old_months);
void mergeParts(std::vector<DataPartPtr> parts);
bool selectPartsToMerge(Poco::SharedPtr<CurrentlyMergingPartsTagger> &what, bool merge_anything_for_old_months);
void mergeParts(Poco::SharedPtr<CurrentlyMergingPartsTagger> &what);
/// Дождаться, пока фоновые потоки закончат слияния.
void joinMergeThreads();

View File

@ -767,18 +767,22 @@ void StorageMergeTree::mergeThread(bool while_can)
{
try
{
std::vector<DataPartPtr> parts;
while (!shutdown_called
&& (selectPartsToMerge(parts, false)
|| selectPartsToMerge(parts, true)))
while (!shutdown_called)
{
mergeParts(parts);
{
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
Poco::SharedPtr<CurrentlyMergingPartsTagger> what;
if (!selectPartsToMerge(what, false) && !selectPartsToMerge(what, true))
break;
mergeParts(what);
}
if (shutdown_called)
break;
/// Удаляем старые куски.
parts.clear();
clearOldParts();
if (!while_can)
@ -831,7 +835,7 @@ void StorageMergeTree::joinMergeThreads()
/// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки
/// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности
bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool merge_anything_for_old_months)
bool StorageMergeTree::selectPartsToMerge(Poco::SharedPtr<CurrentlyMergingPartsTagger> &what, bool merge_anything_for_old_months)
{
LOG_DEBUG(log, "Selecting parts to merge");
@ -854,11 +858,10 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
struct statvfs fs;
/// Смотрим количество свободного места в файловой системе
if (statvfs(full_path.c_str(), &fs))
{
LOG_WARNING(log, "Could not calculate available disk space. statvfs in selectPartsToMerge returned non-zero value");
} else
total_free_bytes = fs.f_bfree * fs.f_bsize;
if (statvfs(full_path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
total_free_bytes = fs.f_bfree * fs.f_bsize;
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
@ -908,7 +911,7 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
size_t cur_max = first_part->size;
size_t cur_min = first_part->size;
size_t cur_sum = first_part->size;
size_t cur_total_size = first_part->getSizeInBytes();
size_t cur_total_size = first_part->size_in_bytes;
int cur_len = 1;
DayNum_t month = first_part->left_month;
@ -943,7 +946,7 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
cur_max = std::max(cur_max, last_part->size);
cur_min = std::min(cur_min, last_part->size);
cur_sum += last_part->size;
cur_total_size += last_part->getSizeInBytes();
cur_total_size += last_part->size_in_bytes;
++cur_len;
cur_id = last_part->right;
@ -977,7 +980,7 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
cur_longest_min = cur_min;
cur_longest_len = cur_len;
} else
LOG_DEBUG(log, "Won't merge parts from " << first_part->name << " to " << last_part->name << " because not enough free space: " << total_free_bytes << " free, " << maybe_used_bytes << " already involved in merge," << cur_total_size << " required now (+50% on overhead)");
LOG_WARNING(log, "Won't merge parts from " << first_part->name << " to " << last_part->name << " because not enough free space: " << total_free_bytes << " free, " << maybe_used_bytes << " already involved in merge, " << cur_total_size << " required now (+50% on overhead)");
}
}
@ -1001,15 +1004,15 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
if (found)
{
parts.clear();
std::vector<DataPartPtr> parts;
DataParts::iterator it = best_begin;
for (int i = 0; i < max_len; ++i)
{
parts.push_back(*it);
parts.back()->currently_merging = true;
++it;
}
what = new CurrentlyMergingPartsTagger(parts, data_parts_mutex);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
}
@ -1023,13 +1026,11 @@ bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts, bool
/// parts должны быть отсортированы.
void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
void StorageMergeTree::mergeParts(Poco::SharedPtr<CurrentlyMergingPartsTagger> &what)
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
const std::vector<DataPartPtr> &parts(what->parts);
/// К этому моменту части уже должны быть помечены как currently_merging
/// Необходимо для того, чтобы при вызове деструктора части пометились как не currently_merging
CurrentlyMergingPartsTagger marker(parts, data_parts_mutex);
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
Names all_column_names;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)