2012-07-17 20:04:39 +00:00
|
|
|
|
#include <DB/Storages/StorageMergeTree.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
|
|
|
|
|
#include <DB/Common/escapeForFileName.h>
|
2014-07-11 12:47:45 +00:00
|
|
|
|
#include <DB/Interpreters/InterpreterAlterQuery.h>
|
2012-07-19 20:32:10 +00:00
|
|
|
|
|
2012-07-17 20:04:39 +00:00
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2014-09-10 11:34:26 +00:00
|
|
|
|
StorageMergeTree::StorageMergeTree(const String & path_, const String & database_name_, const String & table_name_,
|
2014-05-08 07:12:01 +00:00
|
|
|
|
NamesAndTypesListPtr columns_,
|
2014-07-02 12:30:38 +00:00
|
|
|
|
Context & context_,
|
2014-03-09 17:36:01 +00:00
|
|
|
|
ASTPtr & primary_expr_ast_,
|
|
|
|
|
const String & date_column_name_,
|
2014-04-08 07:58:53 +00:00
|
|
|
|
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
|
2014-03-09 17:36:01 +00:00
|
|
|
|
size_t index_granularity_,
|
|
|
|
|
MergeTreeData::Mode mode_,
|
|
|
|
|
const String & sign_column_,
|
|
|
|
|
const MergeTreeSettings & settings_)
|
2014-09-10 11:34:26 +00:00
|
|
|
|
: path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
|
|
|
|
|
increment(full_path + "increment.txt"), context(context_), background_pool(context_.getBackgroundPool()),
|
2014-05-08 18:34:43 +00:00
|
|
|
|
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
|
2014-09-10 11:34:26 +00:00
|
|
|
|
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + table_name, false),
|
2014-03-13 12:48:07 +00:00
|
|
|
|
reader(data), writer(data), merger(data),
|
2014-09-10 11:34:26 +00:00
|
|
|
|
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")),
|
2014-03-13 12:48:07 +00:00
|
|
|
|
shutdown_called(false)
|
|
|
|
|
{
|
|
|
|
|
increment.fixIfBroken(data.getMaxDataPartIndex());
|
2014-04-09 15:52:47 +00:00
|
|
|
|
|
2014-08-13 08:07:52 +00:00
|
|
|
|
data.loadDataParts(false);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
data.clearOldParts();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
2012-07-17 20:04:39 +00:00
|
|
|
|
|
2013-02-06 11:26:35 +00:00
|
|
|
|
StoragePtr StorageMergeTree::create(
|
2014-09-10 11:34:26 +00:00
|
|
|
|
const String & path_, const String & database_name_, const String & table_name_,
|
2014-05-08 07:12:01 +00:00
|
|
|
|
NamesAndTypesListPtr columns_,
|
2014-07-02 12:30:38 +00:00
|
|
|
|
Context & context_,
|
2013-02-06 11:26:35 +00:00
|
|
|
|
ASTPtr & primary_expr_ast_,
|
2014-03-09 17:36:01 +00:00
|
|
|
|
const String & date_column_name_,
|
|
|
|
|
const ASTPtr & sampling_expression_,
|
2013-02-06 11:26:35 +00:00
|
|
|
|
size_t index_granularity_,
|
2014-03-09 17:36:01 +00:00
|
|
|
|
MergeTreeData::Mode mode_,
|
2013-02-06 11:26:35 +00:00
|
|
|
|
const String & sign_column_,
|
2014-03-09 17:36:01 +00:00
|
|
|
|
const MergeTreeSettings & settings_)
|
2013-02-06 11:26:35 +00:00
|
|
|
|
{
|
2014-04-11 13:05:17 +00:00
|
|
|
|
StorageMergeTree * res = new StorageMergeTree(
|
2014-09-10 11:34:26 +00:00
|
|
|
|
path_, database_name_, table_name_, columns_, context_, primary_expr_ast_, date_column_name_,
|
2014-04-11 13:05:17 +00:00
|
|
|
|
sampling_expression_, index_granularity_, mode_, sign_column_, settings_);
|
|
|
|
|
StoragePtr res_ptr = res->thisPtr();
|
|
|
|
|
|
2014-07-02 12:30:38 +00:00
|
|
|
|
res->merge_task_handle = res->background_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
|
2014-04-11 13:05:17 +00:00
|
|
|
|
|
|
|
|
|
return res_ptr;
|
2013-02-06 11:26:35 +00:00
|
|
|
|
}
|
|
|
|
|
|
2013-09-30 01:29:19 +00:00
|
|
|
|
void StorageMergeTree::shutdown()
|
2012-07-30 20:32:36 +00:00
|
|
|
|
{
|
2014-03-13 12:48:07 +00:00
|
|
|
|
if (shutdown_called)
|
|
|
|
|
return;
|
|
|
|
|
shutdown_called = true;
|
|
|
|
|
merger.cancelAll();
|
2014-07-02 12:30:38 +00:00
|
|
|
|
background_pool.removeTask(merge_task_handle);
|
2012-07-18 19:44:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
StorageMergeTree::~StorageMergeTree()
|
|
|
|
|
{
|
|
|
|
|
shutdown();
|
|
|
|
|
}
|
2012-07-18 19:44:04 +00:00
|
|
|
|
|
2012-07-21 05:07:14 +00:00
|
|
|
|
BlockInputStreams StorageMergeTree::read(
|
2014-03-09 17:36:01 +00:00
|
|
|
|
const Names & column_names,
|
2012-07-21 05:07:14 +00:00
|
|
|
|
ASTPtr query,
|
2013-02-01 19:02:04 +00:00
|
|
|
|
const Settings & settings,
|
2012-07-21 05:07:14 +00:00
|
|
|
|
QueryProcessingStage::Enum & processed_stage,
|
|
|
|
|
size_t max_block_size,
|
|
|
|
|
unsigned threads)
|
|
|
|
|
{
|
2014-03-13 12:48:07 +00:00
|
|
|
|
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
|
2012-12-06 09:45:09 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
|
2013-01-23 11:16:32 +00:00
|
|
|
|
{
|
2014-03-19 10:45:13 +00:00
|
|
|
|
return new MergeTreeBlockOutputStream(*this);
|
2013-01-23 11:16:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-20 13:28:49 +00:00
|
|
|
|
void StorageMergeTree::drop()
|
2012-08-16 18:17:01 +00:00
|
|
|
|
{
|
2014-04-11 15:53:32 +00:00
|
|
|
|
shutdown();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
data.dropAllData();
|
2013-08-07 13:07:42 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-28 14:33:05 +00:00
|
|
|
|
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
|
2014-03-04 11:30:50 +00:00
|
|
|
|
{
|
2014-07-28 14:33:05 +00:00
|
|
|
|
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
|
2014-03-13 19:14:25 +00:00
|
|
|
|
|
2014-07-28 14:33:30 +00:00
|
|
|
|
data.setPath(new_full_path, true);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
path = new_path_to_db;
|
2014-09-10 11:34:26 +00:00
|
|
|
|
table_name = new_table_name;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
full_path = new_full_path;
|
|
|
|
|
|
|
|
|
|
increment.setPath(full_path + "increment.txt");
|
2014-05-08 07:12:01 +00:00
|
|
|
|
|
|
|
|
|
/// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger.
|
2014-03-04 11:30:50 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
void StorageMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
|
2013-08-07 13:07:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
|
2013-10-03 12:46:17 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
auto table_soft_lock = lockDataForAlter();
|
2014-03-20 13:00:42 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
data.checkAlter(params);
|
|
|
|
|
|
|
|
|
|
NamesAndTypesList new_columns = data.getColumnsList();
|
|
|
|
|
params.apply(new_columns);
|
|
|
|
|
|
|
|
|
|
MergeTreeData::DataParts parts = data.getDataParts();
|
|
|
|
|
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
|
2014-09-29 20:26:46 +00:00
|
|
|
|
for (const MergeTreeData::DataPartPtr & part : parts)
|
2014-07-11 12:47:45 +00:00
|
|
|
|
{
|
2014-07-17 09:38:31 +00:00
|
|
|
|
auto transaction = data.alterDataPart(part, new_columns);
|
|
|
|
|
if (transaction)
|
|
|
|
|
transactions.push_back(std::move(transaction));
|
2014-07-11 12:47:45 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto table_hard_lock = lockStructureForAlter();
|
|
|
|
|
|
|
|
|
|
InterpreterAlterQuery::updateMetadata(database_name, table_name, new_columns, context);
|
|
|
|
|
data.setColumnsList(new_columns);
|
|
|
|
|
|
|
|
|
|
for (auto & transaction : transactions)
|
|
|
|
|
{
|
|
|
|
|
transaction->commit();
|
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context * pool_context)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2014-07-14 15:49:03 +00:00
|
|
|
|
auto structure_lock = lockStructure(true);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
/// Удаляем старые куски.
|
|
|
|
|
data.clearOldParts();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
/// Нужно вызывать деструктор под незалоченным currently_merging_mutex.
|
|
|
|
|
CurrentlyMergingPartsTaggerPtr merging_tagger;
|
|
|
|
|
String merged_name;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
{
|
2014-04-11 13:05:17 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
|
2014-03-27 11:30:54 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
MergeTreeData::DataPartsVector parts;
|
|
|
|
|
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
|
|
|
|
|
/// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски,
|
|
|
|
|
/// не будем сливать большие куски.
|
2014-09-17 23:57:25 +00:00
|
|
|
|
size_t big_merges = background_pool.getCounter("big merges");
|
2014-07-02 12:30:38 +00:00
|
|
|
|
bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads();
|
2014-04-11 13:05:17 +00:00
|
|
|
|
|
|
|
|
|
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
|
|
|
|
|
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
|
|
|
|
|
{
|
2014-09-17 23:57:25 +00:00
|
|
|
|
LOG_INFO(log, "No parts to merge");
|
2014-04-11 13:05:17 +00:00
|
|
|
|
return false;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
merging_tagger = new CurrentlyMergingPartsTagger(parts, merger.estimateDiskSpaceForMerge(parts), *this);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
|
|
|
|
|
if (pool_context)
|
|
|
|
|
{
|
|
|
|
|
for (const auto & part : parts)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2014-07-23 15:24:45 +00:00
|
|
|
|
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
|
2014-03-13 19:07:17 +00:00
|
|
|
|
{
|
2014-04-11 13:05:17 +00:00
|
|
|
|
pool_context->incrementCounter("big merges");
|
|
|
|
|
break;
|
2014-03-13 19:07:17 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-10 11:34:26 +00:00
|
|
|
|
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
|
|
|
|
|
merger.mergeParts(merging_tagger->parts, merged_name, *merge_entry, nullptr, &*merging_tagger->reserved_space);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool StorageMergeTree::mergeTask(BackgroundProcessingPool::Context & context)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2014-04-11 13:05:17 +00:00
|
|
|
|
if (shutdown_called)
|
|
|
|
|
return false;
|
2014-04-11 18:04:21 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
return merge(false, &context);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception & e)
|
|
|
|
|
{
|
|
|
|
|
if (e.code() == ErrorCodes::ABORTED)
|
|
|
|
|
{
|
|
|
|
|
LOG_INFO(log, "Merge cancelled");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
throw;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-11 13:05:17 +00:00
|
|
|
|
|
2014-03-13 17:44:00 +00:00
|
|
|
|
bool StorageMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
|
|
|
|
|
{
|
|
|
|
|
return !currently_merging.count(left) && !currently_merging.count(right);
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-03 17:57:01 +00:00
|
|
|
|
|
2014-10-18 19:14:09 +00:00
|
|
|
|
void StorageMergeTree::dropPartition(const Field & partition, bool detach, const Settings & settings)
|
2014-10-03 17:57:01 +00:00
|
|
|
|
{
|
2014-10-03 18:41:16 +00:00
|
|
|
|
/** TODO В этот момент могут идти мерджи кусков в удаляемой партиции.
|
|
|
|
|
* Когда эти мерджи завершатся, то часть данных из удаляемой партиции "оживёт".
|
|
|
|
|
* Было бы удобно прерывать все мерджи.
|
|
|
|
|
*/
|
|
|
|
|
|
2014-10-03 17:57:01 +00:00
|
|
|
|
DayNum_t month = MergeTreeData::getMonthDayNum(partition);
|
|
|
|
|
|
|
|
|
|
size_t removed_parts = 0;
|
|
|
|
|
MergeTreeData::DataParts parts = data.getDataParts();
|
|
|
|
|
|
|
|
|
|
for (const auto & part : parts)
|
|
|
|
|
{
|
2014-10-03 18:55:11 +00:00
|
|
|
|
if (!(part->left_month == part->right_month && part->left_month == month))
|
2014-10-03 17:57:01 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Removing part " << part->name);
|
|
|
|
|
++removed_parts;
|
|
|
|
|
|
|
|
|
|
if (detach)
|
|
|
|
|
data.renameAndDetachPart(part, "");
|
|
|
|
|
else
|
|
|
|
|
data.replaceParts({part}, {}, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << apply_visitor(FieldVisitorToString(), partition) << ".");
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-03 18:41:16 +00:00
|
|
|
|
|
2014-10-18 19:14:09 +00:00
|
|
|
|
void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, bool part, const Settings & settings)
|
2014-10-03 18:41:16 +00:00
|
|
|
|
{
|
|
|
|
|
if (unreplicated)
|
|
|
|
|
throw Exception("UNREPLICATED option for ATTACH has meaning only for ReplicatedMergeTree", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
|
|
|
|
|
|
String partition;
|
|
|
|
|
|
|
|
|
|
if (part)
|
|
|
|
|
partition = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
|
|
|
|
|
else
|
|
|
|
|
partition = MergeTreeData::getMonthName(field);
|
|
|
|
|
|
|
|
|
|
String source_dir = "detached/";
|
|
|
|
|
|
|
|
|
|
/// Составим список кусков, которые нужно добавить.
|
|
|
|
|
Strings parts;
|
|
|
|
|
if (part)
|
|
|
|
|
{
|
|
|
|
|
parts.push_back(partition);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
|
|
|
|
|
ActiveDataPartSet active_parts;
|
|
|
|
|
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
|
|
|
|
|
{
|
|
|
|
|
String name = it.name();
|
|
|
|
|
if (!ActiveDataPartSet::isPartDirectory(name))
|
|
|
|
|
continue;
|
|
|
|
|
if (name.substr(0, partition.size()) != partition)
|
|
|
|
|
continue;
|
|
|
|
|
LOG_DEBUG(log, "Found part " << name);
|
|
|
|
|
active_parts.add(name);
|
|
|
|
|
}
|
|
|
|
|
LOG_DEBUG(log, active_parts.size() << " of them are active");
|
|
|
|
|
parts = active_parts.getParts();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const auto & source_part_name : parts)
|
|
|
|
|
{
|
|
|
|
|
String source_path = source_dir + source_part_name;
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Checking data");
|
|
|
|
|
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
|
|
|
|
|
|
|
|
|
|
UInt64 index = increment.get();
|
|
|
|
|
String new_part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, index, index, 0);
|
|
|
|
|
part->renameTo(new_part_name);
|
|
|
|
|
part->name = new_part_name;
|
|
|
|
|
ActiveDataPartSet::parsePartName(part->name, *part);
|
|
|
|
|
|
|
|
|
|
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path << " as " << new_part_name);
|
|
|
|
|
data.attachPart(part);
|
|
|
|
|
|
|
|
|
|
LOG_INFO(log, "Finished attaching part " << new_part_name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// На месте удаленных кусков могут появиться новые, с другими данными.
|
|
|
|
|
context.resetCaches();
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-17 20:04:39 +00:00
|
|
|
|
}
|