This commit is contained in:
Michael Kolupaev 2014-04-11 17:05:17 +04:00
parent a67d4f7e11
commit 7ec176038c
8 changed files with 335 additions and 121 deletions

View File

@ -0,0 +1,258 @@
#pragma once
#include <thread>
#include <set>
#include <map>
#include <Poco/Mutex.h>
#include <Poco/RWLock.h>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
namespace DB
{
/** В нескольких потоках в бесконечном цикле выполняет указанные функции.
*/
class BackgroundProcessingPool
{
public:
typedef std::map<String, int> Counters;
/** Используется изнутри таски. Позволяет инкрементировать какие-нибудь счетчики.
* После завершения таски, все изменения откатятся.
* Например, чтобы можно было узнавать количество потоков, выполняющих большое слияние,
* можно в таске, выполняющей большое слияние, инкрементировать счетчик. Декрементировать обратно его не нужно.
*/
class Context
{
public:
void incrementCounter(const String & name, int value = 1)
{
Poco::ScopedLock<Poco::FastMutex> lock(pool.mutex);
local_counters[name] += value;
pool.counters[name] += value;
}
private:
friend class BackgroundProcessingPool;
Context(BackgroundProcessingPool & pool_, Counters & local_counters_) : pool(pool_), local_counters(local_counters_) {}
BackgroundProcessingPool & pool;
Counters & local_counters;
};
/// Возвращает true, если что-то получилось сделать. В таком случае поток не будет спать перед следующим вызовом.
typedef std::function<bool (Context & context)> Task;
typedef std::shared_ptr<void> TaskHandle;
BackgroundProcessingPool() : size(1), sleep_seconds(1), shutdown(false) {}
void setNumberOfThreads(int size_)
{
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (size_ == size)
return;
if (threads.empty())
{
size = size_;
return;
}
throw Exception("setNumberOfThreads is not implemented for non-empty pool", ErrorCodes::NOT_IMPLEMENTED);
}
int getNumberOfThreads()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return size;
}
void setSleepTime(double seconds)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
sleep_seconds = seconds;
}
int getCounter(const String & name)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return counters[name];
}
TaskHandle addTask(const Task & task)
{
Poco::ScopedLock<Poco::FastMutex> lock(threads_mutex);
TaskInfoPtr res = std::make_shared<TaskInfo>(task);
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
tasks.push_back(res);
}
if (threads.empty())
{
shutdown = false;
counters.clear();
threads.resize(size);
for (std::thread & thread : threads)
thread = std::thread(std::bind(&BackgroundProcessingPool::threadFunction, this));
}
return res;
}
void removeTask(const TaskHandle & handle)
{
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
TaskInfoPtr task = std::static_pointer_cast<TaskInfo>(handle);
/// Дождемся завершения всех выполнений этой задачи.
{
Poco::ScopedWriteRWLock wlock(task->lock);
task->removed = true;
}
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
auto it = std::find(tasks.begin(), tasks.end(), task);
if (it == tasks.end())
throw Exception("Task not found", ErrorCodes::LOGICAL_ERROR);
tasks.erase(it);
}
if (tasks.empty())
{
shutdown = true;
for (std::thread & thread : threads)
thread.join();
threads.clear();
counters.clear();
}
}
~BackgroundProcessingPool()
{
try
{
Poco::ScopedLock<Poco::FastMutex> lock(threads_mutex);
if (!threads.empty())
{
LOG_ERROR(&Logger::get("~BackgroundProcessingPool"), "Destroying non-empty BackgroundProcessingPool");
shutdown = true;
for (std::thread & thread : threads)
thread.join();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private:
struct TaskInfo
{
Task function;
Poco::RWLock lock;
volatile bool removed;
TaskInfo(const Task & function_) : function(function_), removed(false) {}
};
typedef std::shared_ptr<TaskInfo> TaskInfoPtr;
typedef std::vector<TaskInfoPtr> Tasks;
typedef std::vector<std::thread> Threads;
Poco::FastMutex threads_mutex;
Poco::FastMutex mutex;
int size;
Tasks tasks;
Threads threads;
Counters counters;
double sleep_seconds;
bool shutdown;
void threadFunction()
{
size_t i = static_cast<size_t>(rand());
while (!shutdown)
{
Counters counters_diff;
bool need_sleep = false;
size_t tasks_count = 1;
try
{
TaskInfoPtr task;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
tasks_count = tasks.size();
if (!tasks.empty())
{
need_sleep = true;
i %= tasks_count;
task = tasks[i];
++i;
}
}
if (shutdown)
break;
if (!task)
{
std::this_thread::sleep_for(std::chrono::duration<double>(sleep_seconds));
continue;
}
Poco::ScopedReadRWLock rlock(task->lock);
if (task->removed)
continue;
Context context(*this, counters_diff);
if (task->function(context))
{
--i;
need_sleep = false;
}
}
catch (...)
{
need_sleep = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Вычтем все счетчики обратно.
if (!counters_diff.empty())
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
for (const auto & it : counters_diff)
{
counters[it.first] -= it.second;
}
}
if (shutdown)
break;
if (need_sleep)
{
std::this_thread::sleep_for(std::chrono::duration<double>(sleep_seconds / tasks_count));
}
}
}
};
}

View File

@ -19,7 +19,6 @@ public:
UInt64 temp_index = storage.increment.get();
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, temp_index);
storage.data.renameTempPartAndAdd(part, &storage.increment);
storage.merge(2);
}
}

View File

@ -77,10 +77,12 @@ struct MergeTreeSettings
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
/// Сколько потоков использовать для объединения кусков.
size_t merging_threads = 2;
/// Сколько потоков использовать для объединения кусков (для MergeTree).
/// Пул потоков общий на весь сервер.
size_t merging_threads = 6;
/// Сколько потоков использовать для загрузки кусков с других реплик и объединения кусков (для ReplicatedMergeTree).
/// Пул потоков на каждую таблицу свой.
size_t replication_threads = 4;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.

View File

@ -5,7 +5,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <statdaemons/threadpool.hpp>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
namespace DB
{
@ -65,8 +65,7 @@ public:
*/
bool optimize()
{
merge(1, false, true);
return true;
return merge(true);
}
void drop() override;
@ -97,7 +96,8 @@ private:
volatile bool shutdown_called;
Poco::SharedPtr<boost::threadpool::pool> merge_threads;
static BackgroundProcessingPool merge_pool;
BackgroundProcessingPool::TaskHandle merge_task_handle;
/// Пока существует, помечает части как currently_merging и держит резерв места.
/// Вероятно, что части будут помечены заранее.
@ -151,18 +151,13 @@ private:
const String & sign_column_,
const MergeTreeSettings & settings_);
/** Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке. Если iterations = 0, объединяет, пока это возможно.
* Если aggressive - выбрать куски не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE).
/** Определяет, какие куски нужно объединять, и объединяет их.
* Если aggressive - выбрать куски, не обращая внимание на соотношение размеров и их новизну (для запроса OPTIMIZE).
* Возвращает, получилось ли что-нибудь объединить.
*/
void merge(size_t iterations = 1, bool async = true, bool aggressive = false);
bool merge(bool aggressive = false, BackgroundProcessingPool::Context * context = nullptr);
/// Если while_can, объединяет в цикле, пока можно; иначе выбирает и объединяет только одну пару кусков.
void mergeThread(bool while_can, bool aggressive);
/// Дождаться, пока фоновые потоки закончат слияния.
void joinMergeThreads();
bool mergeTask(BackgroundProcessingPool::Context & context);
/// Вызывается во время выбора кусков для слияния.
bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right);

View File

@ -319,7 +319,6 @@ Strings MergeTreeData::clearOldParts()
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if (!lock.lock(&all_data_parts_mutex))
{
LOG_TRACE(log, "Already clearing or modifying old parts");
return res;
}

View File

@ -86,7 +86,9 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
/// Кусок достаточно мал или слияние "агрессивное".
if (first_part->size * data.index_granularity > cur_max_rows_to_merge_parts
&& !aggressive)
{
continue;
}
/// Кусок в одном месяце.
if (first_part->left_month != first_part->right_month)
@ -131,7 +133,9 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
if (!can_merge(prev_part, last_part) ||
last_part->left_month != last_part->right_month ||
last_part->left_month != month)
{
break;
}
/// Кусок достаточно мал или слияние "агрессивное".
if (last_part->size * data.index_granularity > cur_max_rows_to_merge_parts
@ -160,11 +164,11 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
if (cur_max * data.index_granularity * 150 > 1024*1024*1024 && cur_age_in_sec < 6*3600)
min_len = 3;
/// Сколько строк есть в кусках после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
size_t oldness_coef = (size_of_remaining_parts + first_part->size - cur_total_size + 0.0) / cur_max;
/// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
size_t oldness_coef = (size_of_remaining_parts + first_part->size - cur_sum + 0.0) / cur_max;
/// Эвристика: если после этой группы кусков еще накопилось мало строк, не будем соглашаться на плохо
/// сбалансированное слияния, расчитывая, что после будущих вставок данных появятся более привлекательные слияния.
/// сбалансированные слияния, расчитывая, что после будущих вставок данных появятся более привлекательные слияния.
double ratio = (oldness_coef + 1) * data.settings.size_ratio_coefficient_to_merge_parts;
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
@ -231,7 +235,8 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
merged_name = MergeTreeData::getPartName(
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name
<< (only_small ? " (only small)" : ""));
}
return found;

View File

@ -6,6 +6,8 @@
namespace DB
{
BackgroundProcessingPool StorageMergeTree::merge_pool;
StorageMergeTree::StorageMergeTree(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
const Context & context_,
@ -20,11 +22,9 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & name_, N
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_),
reader(data), writer(data), merger(data),
log(&Logger::get("StorageMergeTree")),
log(&Logger::get("MergeTree " + name)),
shutdown_called(false)
{
merge_threads = new boost::threadpool::pool(data.settings.merging_threads);
increment.fixIfBroken(data.getMaxDataPartIndex());
data.clearOldParts();
@ -41,9 +41,16 @@ StoragePtr StorageMergeTree::create(
const String & sign_column_,
const MergeTreeSettings & settings_)
{
return (new StorageMergeTree(
StorageMergeTree * res = new StorageMergeTree(
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr();
sampling_expression_, index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
merge_pool.setNumberOfThreads(res->data.settings.merging_threads);
merge_pool.setSleepTime(5);
res->merge_task_handle = merge_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
return res_ptr;
}
void StorageMergeTree::shutdown()
@ -52,8 +59,7 @@ void StorageMergeTree::shutdown()
return;
shutdown_called = true;
merger.cancelAll();
joinMergeThreads();
merge_pool.removeTask(merge_task_handle);
}
@ -81,7 +87,7 @@ BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
void StorageMergeTree::drop()
{
merger.cancelAll();
joinMergeThreads();
merge_pool.removeTask(merge_task_handle);
data.dropAllData();
}
@ -113,114 +119,64 @@ void StorageMergeTree::commitAlterModify(const ASTAlterQuery::Parameters & param
data.commitAlterModify(params);
}
void StorageMergeTree::merge(size_t iterations, bool async, bool aggressive)
bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context)
{
bool while_can = false;
if (iterations == 0)
auto structure_lock = lockStructure(false);
/// Удаляем старые куски.
data.clearOldParts();
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger;
String merged_name;
{
while_can = true;
iterations = data.settings.merging_threads;
}
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
for (size_t i = 0; i < iterations; ++i)
merge_threads->schedule(boost::bind(&StorageMergeTree::mergeThread, this, while_can, aggressive));
MergeTreeData::DataPartsVector parts;
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
/// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски,
/// не будем сливать большие куски.
int big_merges = merge_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= merge_pool.getNumberOfThreads();
if (!async)
joinMergeThreads();
}
void StorageMergeTree::mergeThread(bool while_can, bool aggressive)
{
try
{
while (!shutdown_called)
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
{
auto structure_lock = lockStructure(false);
return false;
}
/// Удаляем старые куски. На случай, если в слиянии что-то сломано, и из следующего блока вылетит исключение.
LOG_TRACE(log, "Clearing old parts");
data.clearOldParts();
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
if (pool_context)
{
for (const auto & part : parts)
{
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger;
String merged_name;
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
MergeTreeData::DataPartsVector parts;
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
bool only_small = false;
/// Если есть активный мердж крупных кусков, то ограничиваемся мерджем только маленьких частей.
for (const auto & part : currently_merging)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
only_small = true;
break;
}
}
LOG_DEBUG(log, "Selecting parts to merge");
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
{
LOG_DEBUG(log, "No parts to merge");
break;
}
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
pool_context->incrementCounter("big merges");
break;
}
merger.mergeParts(merging_tagger->parts, merged_name);
}
if (shutdown_called)
break;
/// Удаляем куски, которые мы только что сливали.
LOG_TRACE(log, "Clearing old parts");
data.clearOldParts();
if (!while_can)
break;
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception");
}
merger.mergeParts(merging_tagger->parts, merged_name);
return true;
}
void StorageMergeTree::joinMergeThreads()
bool StorageMergeTree::mergeTask(BackgroundProcessingPool::Context & context)
{
LOG_DEBUG(log, "Waiting for merge threads to finish.");
merge_threads->wait();
if (shutdown_called)
return false;
return merge(false, &context);
}
bool StorageMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return !currently_merging.count(left) && !currently_merging.count(right);

View File

@ -152,7 +152,7 @@ bool selectPartsToMerge(std::vector<DataPtr> & parts)
double size_ratio_modifier = std::max(0.5, 2 - 3 * (log_cur_sum) / (25 + log_cur_sum));
/// Объединяем все в одну константу, но не меньшую единицы
double ratio = std::max(1., time_ratio_modifier * size_ratio_modifier * settings.max_size_ratio_to_merge_parts);
double ratio = std::max(1., time_ratio_modifier * size_ratio_modifier * 5);
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
if (cur_len >= min_len &&