ClickHouse/dbms/src/Storages/StorageMergeTree.cpp

230 lines
6.5 KiB
C++
Raw Normal View History

2012-07-17 20:04:39 +00:00
#include <DB/Storages/StorageMergeTree.h>
2014-03-13 12:48:07 +00:00
#include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Common/escapeForFileName.h>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
2014-03-13 12:48:07 +00:00
2014-03-09 17:36:01 +00:00
StorageMergeTree::StorageMergeTree(const String & path_, const String & name_, NamesAndTypesListPtr columns_,
const Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
2014-04-08 07:58:53 +00:00
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
2014-03-09 17:36:01 +00:00
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
2014-03-13 12:48:07 +00:00
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), increment(full_path + "increment.txt"),
2014-04-06 01:19:02 +00:00
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
2014-03-13 12:48:07 +00:00
index_granularity_,mode_, sign_column_, settings_),
reader(data), writer(data), merger(data),
log(&Logger::get("StorageMergeTree")),
shutdown_called(false)
{
merge_threads = new boost::threadpool::pool(data.settings.merging_threads);
increment.fixIfBroken(data.getMaxDataPartIndex());
2014-04-09 15:52:47 +00:00
data.clearOldParts();
2014-03-13 12:48:07 +00:00
}
2012-07-17 20:04:39 +00:00
StoragePtr StorageMergeTree::create(
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
const Context & context_,
ASTPtr & primary_expr_ast_,
2014-03-09 17:36:01 +00:00
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
2014-03-09 17:36:01 +00:00
MergeTreeData::Mode mode_,
const String & sign_column_,
2014-03-09 17:36:01 +00:00
const MergeTreeSettings & settings_)
{
return (new StorageMergeTree(
2013-09-30 19:54:25 +00:00
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr();
}
2013-09-30 01:29:19 +00:00
void StorageMergeTree::shutdown()
2012-07-30 20:32:36 +00:00
{
2014-03-13 12:48:07 +00:00
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelAll();
joinMergeThreads();
2012-07-18 19:44:04 +00:00
}
2014-03-13 12:48:07 +00:00
StorageMergeTree::~StorageMergeTree()
{
shutdown();
}
2012-07-18 19:44:04 +00:00
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree::read(
2014-03-09 17:36:01 +00:00
const Names & column_names,
2012-07-21 05:07:14 +00:00
ASTPtr query,
const Settings & settings,
2012-07-21 05:07:14 +00:00
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
2014-03-13 12:48:07 +00:00
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
2012-12-06 09:45:09 +00:00
}
2014-03-09 17:36:01 +00:00
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
2013-01-23 11:16:32 +00:00
{
return new MergeTreeBlockOutputStream(*this);
2013-01-23 11:16:32 +00:00
}
2014-03-20 13:28:49 +00:00
void StorageMergeTree::drop()
2012-08-16 18:17:01 +00:00
{
2014-03-13 12:48:07 +00:00
merger.cancelAll();
joinMergeThreads();
data.dropAllData();
2013-08-07 13:07:42 +00:00
}
2014-03-09 17:36:01 +00:00
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_name)
{
2014-03-13 12:48:07 +00:00
std::string new_full_path = new_path_to_db + escapeForFileName(new_name) + '/';
2014-03-13 19:14:25 +00:00
data.setPath(new_full_path);
2014-03-13 12:48:07 +00:00
path = new_path_to_db;
name = new_name;
full_path = new_full_path;
increment.setPath(full_path + "increment.txt");
}
2013-08-09 00:12:59 +00:00
void StorageMergeTree::alter(const ASTAlterQuery::Parameters & params)
2013-08-07 13:07:42 +00:00
{
2014-03-09 17:36:01 +00:00
data.alter(params);
2013-10-03 12:46:17 +00:00
}
2014-03-20 13:00:42 +00:00
void StorageMergeTree::prepareAlterModify(const ASTAlterQuery::Parameters & params)
{
data.prepareAlterModify(params);
}
void StorageMergeTree::commitAlterModify(const ASTAlterQuery::Parameters & params)
{
data.commitAlterModify(params);
}
2014-03-13 12:48:07 +00:00
void StorageMergeTree::merge(size_t iterations, bool async, bool aggressive)
{
bool while_can = false;
if (iterations == 0)
{
while_can = true;
iterations = data.settings.merging_threads;
}
for (size_t i = 0; i < iterations; ++i)
merge_threads->schedule(boost::bind(&StorageMergeTree::mergeThread, this, while_can, aggressive));
if (!async)
joinMergeThreads();
}
void StorageMergeTree::mergeThread(bool while_can, bool aggressive)
{
try
{
while (!shutdown_called)
{
2014-03-27 11:30:54 +00:00
auto structure_lock = lockStructure(false);
2014-03-13 12:48:07 +00:00
/// Удаляем старые куски. На случай, если в слиянии что-то сломано, и из следующего блока вылетит исключение.
2014-04-09 15:52:47 +00:00
LOG_TRACE(log, "Clearing old parts");
2014-03-13 12:48:07 +00:00
data.clearOldParts();
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
{
2014-03-13 19:07:17 +00:00
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
2014-03-13 17:44:00 +00:00
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
CurrentlyMergingPartsTaggerPtr merging_tagger;
2014-04-04 10:37:33 +00:00
String merged_name;
2014-03-13 17:44:00 +00:00
2014-03-13 19:07:17 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
2014-03-13 17:44:00 +00:00
2014-03-13 19:07:17 +00:00
MergeTreeData::DataPartsVector parts;
2014-04-04 10:37:33 +00:00
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
2014-03-13 19:07:17 +00:00
bool only_small = false;
2014-03-13 17:44:00 +00:00
2014-03-13 19:07:17 +00:00
/// Если есть активный мердж крупных кусков, то ограничиваемся мерджем только маленьких частей.
for (const auto & part : currently_merging)
2014-03-13 17:44:00 +00:00
{
2014-03-13 19:07:17 +00:00
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
only_small = true;
break;
}
2014-03-13 17:44:00 +00:00
}
2014-04-04 12:47:57 +00:00
LOG_DEBUG(log, "Selecting parts to merge");
2014-04-04 10:37:33 +00:00
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
2014-04-04 12:47:57 +00:00
{
LOG_DEBUG(log, "No parts to merge");
2014-03-27 11:30:54 +00:00
break;
2014-04-04 12:47:57 +00:00
}
2014-03-13 12:48:07 +00:00
2014-03-13 19:07:17 +00:00
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
}
2014-03-13 17:44:00 +00:00
2014-04-04 10:37:33 +00:00
merger.mergeParts(merging_tagger->parts, merged_name);
2014-03-13 12:48:07 +00:00
}
if (shutdown_called)
break;
/// Удаляем куски, которые мы только что сливали.
2014-04-09 15:52:47 +00:00
LOG_TRACE(log, "Clearing old parts");
2014-03-13 12:48:07 +00:00
data.clearOldParts();
if (!while_can)
break;
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception");
}
}
void StorageMergeTree::joinMergeThreads()
{
LOG_DEBUG(log, "Waiting for merge threads to finish.");
merge_threads->wait();
}
2014-03-13 17:44:00 +00:00
bool StorageMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return !currently_merging.count(left) && !currently_merging.count(right);
}
2012-07-17 20:04:39 +00:00
}