This commit is contained in:
Pavel Kartavyy 2015-03-23 11:42:29 +03:00
parent 05401b901b
commit 134ee3a779
2 changed files with 86 additions and 56 deletions

View File

@ -53,25 +53,26 @@ public:
~StorageChunkMerger() override;
private:
String this_database;
String name;
const String this_database;
const String name;
NamesAndTypesListPtr columns;
String source_database;
const String source_database;
OptimizedRegularExpression table_name_regexp;
std::string destination_name_prefix;
size_t chunks_to_merge;
const size_t chunks_to_merge;
Context & context;
Settings settings;
std::thread merge_thread;
Poco::Event cancel_merge_thread;
Logger * log;
volatile bool shutdown_called;
/// Название виртуального столбца, отвечающего за имя таблицы, из которой идет чтение. (Например "_table")
String _table_column_name;
class MergeTask;
using MergeTaskPtr = std::shared_ptr<MergeTask>;
MergeTaskPtr merge_task;
DB::BackgroundProcessingPool::TaskHandle merge_task_handle;
StorageChunkMerger(
const std::string & this_database_,
const std::string & name_,
@ -85,11 +86,6 @@ private:
size_t chunks_to_merge_,
Context & context_);
void mergeThread();
bool maybeMergeSomething();
Storages selectChunksToMerge();
bool mergeChunks(const Storages & chunks);
Block getBlockWithVirtualColumns(const Storages & selected_tables) const;
typedef std::set<std::string> TableNames;

View File

@ -21,10 +21,7 @@
namespace DB
{
const int SLEEP_AFTER_MERGE = 1;
const int SLEEP_NO_WORK = 10;
const int SLEEP_AFTER_ERROR = 60;
const int NOTHING_TO_MERGE_PERIOD = 10;
StorageChunkMerger::TableNames StorageChunkMerger::currently_written_groups;
@ -226,6 +223,32 @@ Block StorageChunkMerger::getBlockWithVirtualColumns(const Storages & selected_t
return res;
}
class StorageChunkMerger::MergeTask
{
public:
MergeTask(const StorageChunkMerger & chunk_merger_, DB::Context & context_, Logger * log_)
:
chunk_merger(chunk_merger_),
context(context_),
shutdown_called(false),
log(log_)
{
}
bool merge();
private:
bool maybeMergeSomething();
Storages selectChunksToMerge();
bool mergeChunks(const Storages & chunks);
const StorageChunkMerger & chunk_merger;
DB::Context & context;
std::atomic<bool> shutdown_called;
Logger * log;
time_t last_nothing_to_merge_time = 0;
};
StorageChunkMerger::StorageChunkMerger(
const std::string & this_database_,
const std::string & name_,
@ -242,20 +265,28 @@ StorageChunkMerger::StorageChunkMerger(
this_database(this_database_), name(name_), columns(columns_), source_database(source_database_),
table_name_regexp(table_name_regexp_), destination_name_prefix(destination_name_prefix_), chunks_to_merge(chunks_to_merge_),
context(context_), settings(context.getSettings()),
log(&Logger::get("StorageChunkMerger")), shutdown_called(false)
log(&Logger::get("StorageChunkMerger")),
merge_task(std::make_shared<MergeTask>(*this, context, log))
{
merge_thread = std::thread([this] { mergeThread(); });
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
auto & backgroud_pool = context.getBackgroundPool();
MergeTaskPtr tmp_merge_task = merge_task;
DB::BackgroundProcessingPool::Task task = [tmp_merge_task](BackgroundProcessingPool::Context & pool_context) -> bool
{
return tmp_merge_task->merge();
};
merge_task_handle = backgroud_pool.addTask(task);
}
void StorageChunkMerger::shutdown()
{
if (shutdown_called)
if (merge_task->shutdown_called)
return;
shutdown_called = true;
cancel_merge_thread.set();
merge_thread.join();
merge_task->shutdown_called.store(true);
context.getBackgroundPool().removeTask(merge_task_handle);
}
StorageChunkMerger::~StorageChunkMerger()
@ -263,41 +294,44 @@ StorageChunkMerger::~StorageChunkMerger()
shutdown();
}
void StorageChunkMerger::mergeThread()
bool StorageChunkMerger::MergeTask::merge()
{
while (true)
time_t now = time(0);
if (last_nothing_to_merge_time + NOTHING_TO_MERGE_PERIOD > now)
return false;
if (shutdown_called)
return false;
bool merged = false;
try
{
bool merged = false;
bool error = true;
try
{
merged = maybeMergeSomething();
error = false;
}
catch (const Exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << this_database << "." << name << " failed to merge: Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << this_database << "." << name << " failed to merge: Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << this_database << "." << name << " failed to merge: std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what());
}
catch (...)
{
LOG_ERROR(log, "StorageChunkMerger at " << this_database << "." << name << " failed to merge: unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION);
}
unsigned sleep_ammount = error ? SLEEP_AFTER_ERROR : (merged ? SLEEP_AFTER_MERGE : SLEEP_NO_WORK);
if (shutdown_called || cancel_merge_thread.tryWait(1000 * sleep_ammount))
break;
merged = maybeMergeSomething();
}
catch (const Exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << chunk_merger.this_database << "." << chunk_merger.name << " failed to merge: Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << chunk_merger.this_database << "." << chunk_merger.name << " failed to merge: Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "StorageChunkMerger at " << chunk_merger.this_database << "." << chunk_merger.name << " failed to merge: std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", e.what() = " << e.what());
}
catch (...)
{
LOG_ERROR(log, "StorageChunkMerger at " << chunk_merger.this_database << "." << chunk_merger.name << " failed to merge: unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION);
}
if (!merged)
last_nothing_to_merge_time = now;
return merged;
}
static std::string makeName(const std::string & prefix, const std::string & first_chunk, const std::string & last_chunk)