2014-03-14 07:05:43 +00:00
|
|
|
|
#include <Poco/Ext/ScopedTry.h>
|
2014-10-16 01:21:03 +00:00
|
|
|
|
|
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
2014-03-09 17:36:01 +00:00
|
|
|
|
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
2014-03-14 07:05:43 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeReader.h>
|
2014-03-09 17:36:01 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
|
2014-08-08 08:28:13 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
|
2014-03-14 07:05:43 +00:00
|
|
|
|
#include <DB/Parsers/ASTIdentifier.h>
|
|
|
|
|
#include <DB/Parsers/ASTNameTypePair.h>
|
|
|
|
|
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
2014-03-27 17:30:04 +00:00
|
|
|
|
#include <DB/DataStreams/copyData.h>
|
2014-07-09 13:39:19 +00:00
|
|
|
|
#include <DB/IO/WriteBufferFromFile.h>
|
2014-12-17 18:37:23 +00:00
|
|
|
|
#include <DB/IO/CompressedReadBuffer.h>
|
2014-09-23 19:47:25 +00:00
|
|
|
|
#include <DB/DataTypes/DataTypeDate.h>
|
2015-04-01 11:44:04 +00:00
|
|
|
|
#include <DB/DataTypes/DataTypeFixedString.h>
|
2014-11-11 04:11:07 +00:00
|
|
|
|
#include <DB/Common/localBackup.h>
|
2014-12-17 15:26:24 +00:00
|
|
|
|
#include <DB/Functions/FunctionFactory.h>
|
2014-10-16 01:21:03 +00:00
|
|
|
|
|
2014-06-10 14:24:33 +00:00
|
|
|
|
#include <algorithm>
|
2014-10-16 01:21:03 +00:00
|
|
|
|
#include <iomanip>
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
MergeTreeData::MergeTreeData(
|
2014-03-13 12:48:07 +00:00
|
|
|
|
const String & full_path_, NamesAndTypesListPtr columns_,
|
2014-10-03 15:30:10 +00:00
|
|
|
|
const NamesAndTypesList & materialized_columns_,
|
2014-09-30 03:08:47 +00:00
|
|
|
|
const NamesAndTypesList & alias_columns_,
|
|
|
|
|
const ColumnDefaults & column_defaults_,
|
2014-03-09 17:36:01 +00:00
|
|
|
|
const Context & context_,
|
|
|
|
|
ASTPtr & primary_expr_ast_,
|
|
|
|
|
const String & date_column_name_, const ASTPtr & sampling_expression_,
|
|
|
|
|
size_t index_granularity_,
|
|
|
|
|
Mode mode_,
|
|
|
|
|
const String & sign_column_,
|
2014-11-22 02:22:30 +00:00
|
|
|
|
const Names & columns_to_sum_,
|
2014-05-08 07:12:01 +00:00
|
|
|
|
const MergeTreeSettings & settings_,
|
2014-07-09 13:39:19 +00:00
|
|
|
|
const String & log_name_,
|
2014-07-23 09:15:41 +00:00
|
|
|
|
bool require_part_metadata_,
|
|
|
|
|
BrokenPartCallback broken_part_callback_)
|
2014-10-03 15:30:10 +00:00
|
|
|
|
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
|
2014-03-09 17:36:01 +00:00
|
|
|
|
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
|
|
|
|
|
index_granularity(index_granularity_),
|
2014-11-22 02:22:30 +00:00
|
|
|
|
mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
|
2015-03-13 21:31:23 +00:00
|
|
|
|
settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
|
2014-07-09 13:39:19 +00:00
|
|
|
|
require_part_metadata(require_part_metadata_),
|
2014-07-23 09:15:41 +00:00
|
|
|
|
full_path(full_path_), columns(columns_),
|
|
|
|
|
broken_part_callback(broken_part_callback_),
|
|
|
|
|
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-09-23 19:47:25 +00:00
|
|
|
|
/// Проверяем, что столбец с датой существует и имеет тип Date.
|
2014-11-22 02:22:30 +00:00
|
|
|
|
const auto check_date_exists = [this] (const NamesAndTypesList & columns)
|
|
|
|
|
{
|
2014-11-11 16:29:21 +00:00
|
|
|
|
for (const auto & column : columns)
|
2014-09-23 19:47:25 +00:00
|
|
|
|
{
|
2014-11-11 16:29:21 +00:00
|
|
|
|
if (column.name == date_column_name)
|
2014-09-23 19:47:25 +00:00
|
|
|
|
{
|
2014-11-11 16:29:21 +00:00
|
|
|
|
if (!typeid_cast<const DataTypeDate *>(column.type.get()))
|
2014-09-23 19:47:25 +00:00
|
|
|
|
throw Exception("Date column (" + date_column_name + ") for storage of MergeTree family must have type Date."
|
2014-11-11 16:29:21 +00:00
|
|
|
|
" Provided column of type " + column.type->getName() + "."
|
|
|
|
|
" You may have separate column with type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
|
|
|
|
|
return true;
|
2014-09-23 19:47:25 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-11 16:29:21 +00:00
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (!check_date_exists(*columns) && !check_date_exists(materialized_columns))
|
|
|
|
|
throw Exception{
|
|
|
|
|
"Date column (" + date_column_name + ") does not exist in table declaration.",
|
2014-11-22 02:22:30 +00:00
|
|
|
|
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
|
|
|
|
|
|
|
|
|
|
/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
|
|
|
|
|
if (!columns_to_sum.empty())
|
|
|
|
|
{
|
|
|
|
|
if (mode != Summing)
|
|
|
|
|
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
|
|
for (const auto & column_to_sum : columns_to_sum)
|
|
|
|
|
if (columns->end() == std::find_if(columns->begin(), columns->end(),
|
|
|
|
|
[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
|
|
|
|
|
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
|
|
|
|
|
}
|
2014-09-23 19:47:25 +00:00
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
/// создаём директорию, если её нет
|
|
|
|
|
Poco::File(full_path).createDirectories();
|
2014-08-07 09:23:55 +00:00
|
|
|
|
Poco::File(full_path + "detached").createDirectory();
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2015-03-13 21:31:23 +00:00
|
|
|
|
if (primary_expr_ast)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2015-03-13 21:31:23 +00:00
|
|
|
|
/// инициализируем описание сортировки
|
|
|
|
|
sort_descr.reserve(primary_expr_ast->children.size());
|
|
|
|
|
for (const ASTPtr & ast : primary_expr_ast->children)
|
|
|
|
|
{
|
|
|
|
|
String name = ast->getColumnName();
|
|
|
|
|
sort_descr.push_back(SortColumnDescription(name, 1));
|
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2015-03-13 21:31:23 +00:00
|
|
|
|
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2015-03-13 21:31:23 +00:00
|
|
|
|
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
|
|
|
|
|
primary_key_sample = projected_expr->getSampleBlock();
|
|
|
|
|
}
|
|
|
|
|
else if (mode != Unsorted)
|
|
|
|
|
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
UInt64 MergeTreeData::getMaxDataPartIndex()
|
|
|
|
|
{
|
2014-03-09 17:36:01 +00:00
|
|
|
|
UInt64 max_part_id = 0;
|
2014-09-29 05:03:03 +00:00
|
|
|
|
for (const auto & part : data_parts)
|
|
|
|
|
max_part_id = std::max(max_part_id, part->right);
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
return max_part_id;
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
std::string MergeTreeData::getModePrefix() const
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-03-13 12:48:07 +00:00
|
|
|
|
switch (mode)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-03-13 12:48:07 +00:00
|
|
|
|
case Ordinary: return "";
|
|
|
|
|
case Collapsing: return "Collapsing";
|
|
|
|
|
case Summing: return "Summing";
|
2014-05-28 14:54:42 +00:00
|
|
|
|
case Aggregating: return "Aggregating";
|
2015-03-13 21:31:23 +00:00
|
|
|
|
case Unsorted: return "Unsorted";
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
default:
|
|
|
|
|
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-08-13 08:07:52 +00:00
|
|
|
|
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Loading data parts");
|
|
|
|
|
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
|
|
|
|
|
|
|
|
|
data_parts.clear();
|
|
|
|
|
|
2014-10-13 17:28:59 +00:00
|
|
|
|
Strings part_file_names;
|
2014-03-09 17:36:01 +00:00
|
|
|
|
Poco::DirectoryIterator end;
|
|
|
|
|
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
|
2014-04-09 15:52:47 +00:00
|
|
|
|
{
|
2014-10-13 17:28:59 +00:00
|
|
|
|
/// Пропускаем временные директории старше суток.
|
|
|
|
|
if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
|
2014-03-09 17:36:01 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
2014-10-13 17:28:59 +00:00
|
|
|
|
part_file_names.push_back(it.name());
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-18 11:05:30 +00:00
|
|
|
|
DataPartsVector broken_parts_to_remove;
|
2014-08-13 08:07:52 +00:00
|
|
|
|
DataPartsVector broken_parts_to_detach;
|
|
|
|
|
size_t suspicious_broken_parts = 0;
|
2014-04-18 11:05:30 +00:00
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
Poco::RegularExpression::MatchVec matches;
|
2014-04-09 15:52:47 +00:00
|
|
|
|
for (const String & file_name : part_file_names)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-08-08 08:28:13 +00:00
|
|
|
|
if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
|
2014-03-09 17:36:01 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
2014-03-14 17:19:38 +00:00
|
|
|
|
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
|
2014-05-26 10:41:40 +00:00
|
|
|
|
ActiveDataPartSet::parsePartName(file_name, *part, &matches);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
part->name = file_name;
|
|
|
|
|
|
2014-04-18 11:05:30 +00:00
|
|
|
|
bool broken = false;
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
2014-08-08 08:28:13 +00:00
|
|
|
|
part->loadColumns(require_part_metadata);
|
|
|
|
|
part->loadChecksums(require_part_metadata);
|
2014-07-09 13:39:19 +00:00
|
|
|
|
part->loadIndex();
|
2014-08-08 08:28:13 +00:00
|
|
|
|
part->checkNotBroken(require_part_metadata);
|
2014-04-18 11:05:30 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
broken = true;
|
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
|
}
|
|
|
|
|
|
2014-04-09 15:52:47 +00:00
|
|
|
|
/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
|
2014-04-18 11:05:30 +00:00
|
|
|
|
if (broken)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
|
|
|
|
if (part->level == 0)
|
|
|
|
|
{
|
|
|
|
|
/// Восстановить куски нулевого уровня невозможно.
|
2014-08-12 09:35:15 +00:00
|
|
|
|
LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it's impossible to repair.");
|
2014-04-18 11:05:30 +00:00
|
|
|
|
broken_parts_to_remove.push_back(part);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2014-04-09 15:52:47 +00:00
|
|
|
|
/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
|
|
|
|
|
/// слиянием, и мы ничего не потеряем, если его удалим.
|
|
|
|
|
int contained_parts = 0;
|
|
|
|
|
|
|
|
|
|
LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");
|
2014-08-13 08:07:52 +00:00
|
|
|
|
++suspicious_broken_parts;
|
2014-04-09 15:52:47 +00:00
|
|
|
|
|
|
|
|
|
for (const String & contained_name : part_file_names)
|
|
|
|
|
{
|
|
|
|
|
if (contained_name == file_name)
|
|
|
|
|
continue;
|
2014-08-08 08:28:13 +00:00
|
|
|
|
if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
|
2014-04-09 15:52:47 +00:00
|
|
|
|
continue;
|
|
|
|
|
DataPart contained_part(*this);
|
2014-05-26 10:41:40 +00:00
|
|
|
|
ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
if (part->contains(contained_part))
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Found part " << full_path + contained_name);
|
|
|
|
|
++contained_parts;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (contained_parts >= 2)
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
|
2014-04-18 11:05:30 +00:00
|
|
|
|
broken_parts_to_remove.push_back(part);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2014-08-13 08:07:52 +00:00
|
|
|
|
LOG_ERROR(log, "Detaching broken part " << full_path + file_name
|
2014-04-09 15:52:47 +00:00
|
|
|
|
<< " because it covers less than 2 parts. You need to resolve this manually");
|
2014-08-13 08:07:52 +00:00
|
|
|
|
broken_parts_to_detach.push_back(part);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
|
|
|
|
|
|
|
|
|
|
data_parts.insert(part);
|
|
|
|
|
}
|
|
|
|
|
|
2014-08-13 08:07:52 +00:00
|
|
|
|
if (suspicious_broken_parts > 5 && !skip_sanity_checks)
|
|
|
|
|
throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
|
2014-04-18 11:05:30 +00:00
|
|
|
|
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
|
|
|
|
|
|
|
|
|
|
for (const auto & part : broken_parts_to_remove)
|
|
|
|
|
part->remove();
|
2014-08-13 08:07:52 +00:00
|
|
|
|
for (const auto & part : broken_parts_to_detach)
|
2014-12-24 20:02:04 +00:00
|
|
|
|
part->renameAddPrefix(true, "");
|
2014-04-18 11:05:30 +00:00
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
all_data_parts = data_parts;
|
|
|
|
|
|
|
|
|
|
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
|
|
|
|
|
* но по каким-то причинам остались лежать в файловой системе.
|
|
|
|
|
* Удаление файлов будет произведено потом в методе clearOldParts.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
if (data_parts.size() >= 2)
|
|
|
|
|
{
|
|
|
|
|
DataParts::iterator prev_jt = data_parts.begin();
|
|
|
|
|
DataParts::iterator curr_jt = prev_jt;
|
|
|
|
|
++curr_jt;
|
|
|
|
|
while (curr_jt != data_parts.end())
|
|
|
|
|
{
|
|
|
|
|
/// Куски данных за разные месяцы рассматривать не будем
|
|
|
|
|
if ((*curr_jt)->left_month != (*curr_jt)->right_month
|
|
|
|
|
|| (*curr_jt)->right_month != (*prev_jt)->left_month
|
|
|
|
|
|| (*prev_jt)->left_month != (*prev_jt)->right_month)
|
|
|
|
|
{
|
|
|
|
|
++prev_jt;
|
|
|
|
|
++curr_jt;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((*curr_jt)->contains(**prev_jt))
|
|
|
|
|
{
|
2014-04-09 15:52:47 +00:00
|
|
|
|
(*prev_jt)->remove_time = time(0);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
data_parts.erase(prev_jt);
|
|
|
|
|
prev_jt = curr_jt;
|
|
|
|
|
++curr_jt;
|
|
|
|
|
}
|
|
|
|
|
else if ((*prev_jt)->contains(**curr_jt))
|
|
|
|
|
{
|
2014-04-09 15:52:47 +00:00
|
|
|
|
(*curr_jt)->remove_time = time(0);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
data_parts.erase(curr_jt++);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
++prev_jt;
|
|
|
|
|
++curr_jt;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-19 11:44:29 +00:00
|
|
|
|
calculateColumnSizes();
|
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-07-25 11:15:11 +00:00
|
|
|
|
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
|
|
|
|
Poco::ScopedTry<Poco::FastMutex> lock;
|
2014-07-25 11:15:11 +00:00
|
|
|
|
DataPartsVector res;
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
|
|
|
|
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
|
|
|
|
|
if (!lock.lock(&all_data_parts_mutex))
|
2014-09-24 23:35:27 +00:00
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(log, "grabOldParts: all_data_parts is locked");
|
2014-04-09 15:52:47 +00:00
|
|
|
|
return res;
|
2014-09-24 23:35:27 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-04-18 10:05:38 +00:00
|
|
|
|
/// Удаляем временные директории старше суток.
|
|
|
|
|
Poco::DirectoryIterator end;
|
2014-09-24 23:35:27 +00:00
|
|
|
|
for (Poco::DirectoryIterator it{full_path}; it != end; ++it)
|
2014-04-18 10:05:38 +00:00
|
|
|
|
{
|
2014-09-24 23:35:27 +00:00
|
|
|
|
if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
|
2014-04-18 10:05:38 +00:00
|
|
|
|
{
|
2014-09-24 23:35:27 +00:00
|
|
|
|
Poco::File tmp_dir(full_path + it.name());
|
2014-04-18 10:05:38 +00:00
|
|
|
|
|
|
|
|
|
if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
|
|
|
|
|
{
|
2014-09-24 23:35:27 +00:00
|
|
|
|
LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
|
|
|
|
|
Poco::File(full_path + it.name()).remove(true);
|
2014-04-18 10:05:38 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-25 11:15:11 +00:00
|
|
|
|
time_t now = time(0);
|
|
|
|
|
for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
|
|
|
|
|
{
|
2014-09-30 20:26:10 +00:00
|
|
|
|
if (it->unique() && /// После этого ref_count не может увеличиться.
|
2014-07-25 11:15:11 +00:00
|
|
|
|
(*it)->remove_time < now &&
|
|
|
|
|
now - (*it)->remove_time > settings.old_parts_lifetime)
|
|
|
|
|
{
|
|
|
|
|
res.push_back(*it);
|
|
|
|
|
all_data_parts.erase(it++);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
++it;
|
|
|
|
|
}
|
|
|
|
|
|
2014-04-09 15:52:47 +00:00
|
|
|
|
return res;
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-25 11:15:11 +00:00
|
|
|
|
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
|
|
|
|
|
all_data_parts.insert(parts.begin(), parts.end());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::clearOldParts()
|
|
|
|
|
{
|
|
|
|
|
auto parts_to_remove = grabOldParts();
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
for (const DataPartPtr & part : parts_to_remove)
|
2014-07-25 11:15:11 +00:00
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Removing part " << part->name);
|
|
|
|
|
part->remove();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-28 14:33:30 +00:00
|
|
|
|
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-28 14:33:30 +00:00
|
|
|
|
if (move_data)
|
|
|
|
|
{
|
|
|
|
|
Poco::File(full_path).renameTo(new_full_path);
|
|
|
|
|
/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
|
|
|
|
|
context.resetCaches();
|
|
|
|
|
}
|
2014-03-13 19:14:25 +00:00
|
|
|
|
|
2014-07-28 14:33:30 +00:00
|
|
|
|
full_path = new_full_path;
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
void MergeTreeData::dropAllData()
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
|
|
|
|
data_parts.clear();
|
|
|
|
|
all_data_parts.clear();
|
2014-09-19 11:44:29 +00:00
|
|
|
|
column_sizes.clear();
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-04-05 19:54:00 +00:00
|
|
|
|
context.resetCaches();
|
2014-03-13 19:14:25 +00:00
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
Poco::File(full_path).remove(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
void MergeTreeData::checkAlter(const AlterCommands & params)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
|
2014-10-16 13:37:01 +00:00
|
|
|
|
auto new_columns = *columns;
|
|
|
|
|
auto new_materialized_columns = materialized_columns;
|
|
|
|
|
auto new_alias_columns = alias_columns;
|
|
|
|
|
auto new_column_defaults = column_defaults;
|
|
|
|
|
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
|
2014-07-09 13:39:19 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// Список столбцов, которые нельзя трогать.
|
|
|
|
|
/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
|
2015-03-13 21:31:23 +00:00
|
|
|
|
|
|
|
|
|
Names keys;
|
|
|
|
|
|
|
|
|
|
if (primary_expr)
|
|
|
|
|
keys = primary_expr->getRequiredColumns();
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
keys.push_back(sign_column);
|
2015-03-13 21:31:23 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
std::sort(keys.begin(), keys.end());
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
for (const AlterCommand & command : params)
|
2014-04-03 08:53:32 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
if (std::binary_search(keys.begin(), keys.end(), command.column_name))
|
|
|
|
|
throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
|
2014-04-03 08:53:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// Проверим, что преобразования типов возможны.
|
|
|
|
|
ExpressionActionsPtr unused_expression;
|
|
|
|
|
NameToNameMap unused_map;
|
2014-10-21 12:11:20 +00:00
|
|
|
|
|
|
|
|
|
/// augment plain columns with materialized columns for convert expression creation
|
|
|
|
|
new_columns.insert(std::end(new_columns),
|
|
|
|
|
std::begin(new_materialized_columns), std::end(new_materialized_columns));
|
|
|
|
|
createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map);
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
|
2014-07-11 12:47:45 +00:00
|
|
|
|
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
out_expression = nullptr;
|
|
|
|
|
out_rename_map.clear();
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
typedef std::map<String, DataTypePtr> NameToType;
|
|
|
|
|
NameToType new_types;
|
|
|
|
|
for (const NameAndTypePair & column : new_columns)
|
2014-03-13 19:36:28 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
new_types[column.name] = column.type;
|
2014-03-13 19:36:28 +00:00
|
|
|
|
}
|
2014-07-09 13:39:19 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
|
|
|
|
|
std::map<String, int> nested_table_counts;
|
|
|
|
|
for (const NameAndTypePair & column : old_columns)
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
2014-04-05 19:54:00 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
for (const NameAndTypePair & column : old_columns)
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
if (!new_types.count(column.name))
|
2014-04-03 08:53:32 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
if (!part || part->hasColumnFiles(column.name))
|
|
|
|
|
{
|
|
|
|
|
/// Столбец нужно удалить.
|
|
|
|
|
|
|
|
|
|
String escaped_column = escapeForFileName(column.name);
|
|
|
|
|
out_rename_map[escaped_column + ".bin"] = "";
|
|
|
|
|
out_rename_map[escaped_column + ".mrk"] = "";
|
|
|
|
|
|
|
|
|
|
/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
|
|
|
|
|
if (typeid_cast<const DataTypeArray *>(&*column.type))
|
|
|
|
|
{
|
|
|
|
|
String nested_table = DataTypeNested::extractNestedTableName(column.name);
|
|
|
|
|
/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
|
|
|
|
|
if (!--nested_table_counts[nested_table])
|
|
|
|
|
{
|
|
|
|
|
String escaped_nested_table = escapeForFileName(nested_table);
|
|
|
|
|
out_rename_map[escaped_nested_table + ".size0.bin"] = "";
|
|
|
|
|
out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-04-03 08:53:32 +00:00
|
|
|
|
}
|
2014-07-11 12:47:45 +00:00
|
|
|
|
else
|
|
|
|
|
{
|
2015-04-01 11:44:04 +00:00
|
|
|
|
const auto new_type = new_types[column.name].get();
|
|
|
|
|
const String new_type_name = new_type->getName();
|
2014-07-11 12:47:45 +00:00
|
|
|
|
|
|
|
|
|
if (new_type_name != column.type->getName() &&
|
|
|
|
|
(!part || part->hasColumnFiles(column.name)))
|
|
|
|
|
{
|
|
|
|
|
/// Нужно изменить тип столбца.
|
|
|
|
|
|
|
|
|
|
if (!out_expression)
|
|
|
|
|
out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef());
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));
|
|
|
|
|
|
|
|
|
|
Names out_names;
|
2015-04-01 11:44:04 +00:00
|
|
|
|
|
|
|
|
|
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(new_type))
|
|
|
|
|
{
|
|
|
|
|
const auto width = fixed_string->getN();
|
|
|
|
|
const auto string_width_column = toString(width);
|
|
|
|
|
out_expression->addInput({ new ColumnConstUInt64{1, width}, new DataTypeUInt64, string_width_column });
|
|
|
|
|
|
|
|
|
|
const auto function = FunctionFactory::instance().get("toFixedString", context);
|
|
|
|
|
out_expression->add(ExpressionAction::applyFunction(function, Names{
|
|
|
|
|
column.name, string_width_column
|
|
|
|
|
}), out_names);
|
|
|
|
|
|
|
|
|
|
out_expression->add(ExpressionAction::removeColumn(string_width_column));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
|
|
|
|
|
out_expression->add(ExpressionAction::applyFunction(function, Names{column.name}), out_names);
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
out_expression->add(ExpressionAction::removeColumn(column.name));
|
|
|
|
|
|
2015-04-01 11:44:04 +00:00
|
|
|
|
const String escaped_expr = escapeForFileName(out_names[0]);
|
|
|
|
|
const String escaped_column = escapeForFileName(column.name);
|
2014-07-11 12:47:45 +00:00
|
|
|
|
out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
|
|
|
|
|
out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
|
|
|
|
|
const DataPartPtr & part, const NamesAndTypesList & new_columns, bool skip_sanity_checks)
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
ExpressionActionsPtr expression;
|
2014-07-17 13:41:47 +00:00
|
|
|
|
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Блокирует изменение куска.
|
2014-07-14 14:07:47 +00:00
|
|
|
|
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
|
2014-07-09 13:39:19 +00:00
|
|
|
|
|
2014-07-17 09:38:31 +00:00
|
|
|
|
if (!skip_sanity_checks && transaction->rename_map.size() > 5)
|
|
|
|
|
{
|
|
|
|
|
transaction->clear();
|
|
|
|
|
|
|
|
|
|
throw Exception("Suspiciously many (" + toString(transaction->rename_map.size()) + ") files need to be modified in part " + part->name
|
|
|
|
|
+ ". Aborting just in case");
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-14 11:45:34 +00:00
|
|
|
|
if (transaction->rename_map.empty())
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-14 14:07:47 +00:00
|
|
|
|
transaction->clear();
|
2014-07-17 09:38:31 +00:00
|
|
|
|
return nullptr;
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
DataPart::Checksums add_checksums;
|
|
|
|
|
|
|
|
|
|
/// Применим выражение и запишем результат во временные файлы.
|
|
|
|
|
if (expression)
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
|
|
|
|
MarkRanges ranges(1, MarkRange(0, part->size));
|
2014-07-11 12:47:45 +00:00
|
|
|
|
BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
|
2015-04-12 04:39:20 +00:00
|
|
|
|
DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE);
|
2014-07-11 12:47:45 +00:00
|
|
|
|
ExpressionBlockInputStream in(part_in, expression);
|
2015-03-14 02:36:39 +00:00
|
|
|
|
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4);
|
2014-03-27 17:30:04 +00:00
|
|
|
|
in.readPrefix();
|
2014-03-20 13:00:42 +00:00
|
|
|
|
out.writePrefix();
|
2014-03-09 17:36:01 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
while (Block b = in.read())
|
|
|
|
|
out.write(b);
|
2014-03-27 17:30:04 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
in.readSuffix();
|
|
|
|
|
add_checksums = out.writeSuffixAndGetChecksums();
|
|
|
|
|
}
|
2014-03-27 17:30:04 +00:00
|
|
|
|
|
2014-07-14 11:45:34 +00:00
|
|
|
|
/// Обновим контрольные суммы.
|
2014-07-11 12:47:45 +00:00
|
|
|
|
DataPart::Checksums new_checksums = part->checksums;
|
|
|
|
|
for (auto it : transaction->rename_map)
|
|
|
|
|
{
|
|
|
|
|
if (it.second == "")
|
|
|
|
|
{
|
|
|
|
|
new_checksums.files.erase(it.first);
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
2014-07-11 12:47:45 +00:00
|
|
|
|
else
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
new_checksums.files[it.second] = add_checksums.files[it.first];
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
/// Запишем обновленные контрольные суммы во временный файл
|
|
|
|
|
if (!part->checksums.empty())
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-14 14:07:47 +00:00
|
|
|
|
transaction->new_checksums = new_checksums;
|
2014-07-11 12:47:45 +00:00
|
|
|
|
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
|
2014-12-17 18:37:23 +00:00
|
|
|
|
new_checksums.write(checksums_file);
|
2014-07-11 12:47:45 +00:00
|
|
|
|
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
|
2014-07-14 14:07:47 +00:00
|
|
|
|
/// Запишем обновленный список столбцов во временный файл.
|
|
|
|
|
{
|
2014-07-15 09:56:17 +00:00
|
|
|
|
transaction->new_columns = new_columns.filter(part->columns.getNames());
|
2014-07-14 14:07:47 +00:00
|
|
|
|
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
|
|
|
|
|
transaction->new_columns.writeText(columns_file);
|
|
|
|
|
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
return transaction;
|
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
void MergeTreeData::AlterDataPartTransaction::commit()
|
|
|
|
|
{
|
|
|
|
|
if (!data_part)
|
|
|
|
|
return;
|
|
|
|
|
try
|
2014-03-09 17:36:01 +00:00
|
|
|
|
{
|
2014-07-14 14:07:47 +00:00
|
|
|
|
Poco::ScopedWriteRWLock lock(data_part->columns_lock);
|
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
String path = data_part->storage.full_path + data_part->name + "/";
|
2014-07-14 14:07:47 +00:00
|
|
|
|
|
|
|
|
|
/// 1) Переименуем старые файлы.
|
2014-07-11 12:47:45 +00:00
|
|
|
|
for (auto it : rename_map)
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-14 14:07:47 +00:00
|
|
|
|
String name = it.second.empty() ? it.first : it.second;
|
|
|
|
|
Poco::File(path + name).renameTo(path + name + ".tmp2");
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-14 15:49:03 +00:00
|
|
|
|
/// 2) Переместим на их место новые и обновим метаданные в оперативке.
|
2014-07-14 14:07:47 +00:00
|
|
|
|
for (auto it : rename_map)
|
|
|
|
|
{
|
|
|
|
|
if (!it.second.empty())
|
2014-03-27 17:30:04 +00:00
|
|
|
|
{
|
2014-07-11 13:34:12 +00:00
|
|
|
|
Poco::File(path + it.first).renameTo(path + it.second);
|
2014-03-27 17:30:04 +00:00
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
2014-07-14 15:49:03 +00:00
|
|
|
|
|
|
|
|
|
DataPart & mutable_part = const_cast<DataPart &>(*data_part);
|
|
|
|
|
mutable_part.checksums = new_checksums;
|
|
|
|
|
mutable_part.columns = new_columns;
|
2014-07-14 14:07:47 +00:00
|
|
|
|
|
|
|
|
|
/// 3) Удалим старые файлы.
|
|
|
|
|
for (auto it : rename_map)
|
|
|
|
|
{
|
|
|
|
|
String name = it.second.empty() ? it.first : it.second;
|
|
|
|
|
Poco::File(path + name + ".tmp2").remove();
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-15 11:12:58 +00:00
|
|
|
|
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
|
|
|
|
|
|
2014-07-22 13:49:52 +00:00
|
|
|
|
/// TODO: можно не сбрасывать кеши при добавлении столбца.
|
|
|
|
|
data_part->storage.context.resetCaches();
|
|
|
|
|
|
2014-07-14 14:07:47 +00:00
|
|
|
|
clear();
|
2014-07-11 12:47:45 +00:00
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
|
2014-07-14 14:07:47 +00:00
|
|
|
|
clear();
|
2014-07-11 12:47:45 +00:00
|
|
|
|
throw;
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
|
2014-03-13 19:36:28 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
try
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
if (!data_part)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
|
2014-03-27 17:30:04 +00:00
|
|
|
|
|
2014-07-11 12:47:45 +00:00
|
|
|
|
String path = data_part->storage.full_path + data_part->name + "/";
|
|
|
|
|
for (auto it : rename_map)
|
|
|
|
|
{
|
|
|
|
|
if (!it.second.empty())
|
2014-03-27 17:30:04 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2014-07-17 13:41:47 +00:00
|
|
|
|
Poco::File file(path + it.first);
|
|
|
|
|
if (file.exists())
|
|
|
|
|
file.remove();
|
2014-07-11 12:47:45 +00:00
|
|
|
|
}
|
|
|
|
|
catch (Poco::Exception & e)
|
|
|
|
|
{
|
|
|
|
|
LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
|
|
|
|
|
}
|
2014-03-27 17:30:04 +00:00
|
|
|
|
}
|
2014-03-20 13:00:42 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2014-07-11 12:47:45 +00:00
|
|
|
|
catch (...)
|
2014-03-20 13:00:42 +00:00
|
|
|
|
{
|
2014-07-11 12:47:45 +00:00
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
2014-03-13 19:36:28 +00:00
|
|
|
|
}
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2014-07-01 15:58:25 +00:00
|
|
|
|
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
if (!removed.empty())
|
|
|
|
|
{
|
|
|
|
|
LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
|
|
|
|
|
<< " existing part(s) (including " << removed[0]->name << ")");
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-01 15:58:25 +00:00
|
|
|
|
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
2014-09-29 20:26:46 +00:00
|
|
|
|
MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
|
2014-03-13 17:44:00 +00:00
|
|
|
|
{
|
2014-07-01 15:58:25 +00:00
|
|
|
|
if (out_transaction && out_transaction->data)
|
2014-09-30 20:26:10 +00:00
|
|
|
|
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
|
2014-07-01 15:58:25 +00:00
|
|
|
|
|
2014-06-27 13:47:00 +00:00
|
|
|
|
LOG_TRACE(log, "Renaming " << part->name << ".");
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
|
|
|
|
|
2014-07-17 10:44:17 +00:00
|
|
|
|
String old_name = part->name;
|
|
|
|
|
String old_path = getFullPath() + old_name + "/";
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
2014-04-07 15:45:46 +00:00
|
|
|
|
/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
|
2014-03-13 17:44:00 +00:00
|
|
|
|
* Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
|
|
|
|
|
* содержат ещё не добавленный кусок.
|
|
|
|
|
*/
|
|
|
|
|
if (increment)
|
2014-04-07 15:45:46 +00:00
|
|
|
|
part->left = part->right = increment->get(false);
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
2014-07-17 10:44:17 +00:00
|
|
|
|
String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
2014-07-17 10:44:17 +00:00
|
|
|
|
part->is_temp = false;
|
|
|
|
|
part->name = new_name;
|
|
|
|
|
bool duplicate = data_parts.count(part);
|
|
|
|
|
part->name = old_name;
|
|
|
|
|
part->is_temp = true;
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
2014-07-17 10:44:17 +00:00
|
|
|
|
if (duplicate)
|
|
|
|
|
throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
|
|
|
|
|
|
|
|
|
|
String new_path = getFullPath() + new_name + "/";
|
2014-03-13 17:44:00 +00:00
|
|
|
|
|
|
|
|
|
/// Переименовываем кусок.
|
|
|
|
|
Poco::File(old_path).renameTo(new_path);
|
|
|
|
|
|
2014-07-17 10:44:17 +00:00
|
|
|
|
part->is_temp = false;
|
|
|
|
|
part->name = new_name;
|
|
|
|
|
|
2014-05-27 12:08:40 +00:00
|
|
|
|
bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
|
2014-04-07 15:45:46 +00:00
|
|
|
|
DataPartsVector res;
|
|
|
|
|
/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
|
|
|
|
|
DataParts::iterator it = data_parts.lower_bound(part);
|
|
|
|
|
/// Пойдем влево.
|
|
|
|
|
while (it != data_parts.begin())
|
|
|
|
|
{
|
|
|
|
|
--it;
|
|
|
|
|
if (!part->contains(**it))
|
2014-04-07 17:39:45 +00:00
|
|
|
|
{
|
2014-05-27 12:08:40 +00:00
|
|
|
|
if ((*it)->contains(*part))
|
|
|
|
|
obsolete = true;
|
2014-04-07 17:39:45 +00:00
|
|
|
|
++it;
|
2014-04-07 15:45:46 +00:00
|
|
|
|
break;
|
2014-04-07 17:39:45 +00:00
|
|
|
|
}
|
2014-04-07 15:45:46 +00:00
|
|
|
|
res.push_back(*it);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
(*it)->remove_time = time(0);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
removePartContributionToColumnSizes(*it);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
data_parts.erase(it++); /// Да, ++, а не --.
|
|
|
|
|
}
|
|
|
|
|
std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
|
|
|
|
|
/// Пойдем вправо.
|
2014-05-27 12:08:40 +00:00
|
|
|
|
while (it != data_parts.end())
|
2014-04-07 15:45:46 +00:00
|
|
|
|
{
|
2014-05-27 12:08:40 +00:00
|
|
|
|
if (!part->contains(**it))
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->name == part->name || (*it)->contains(*part))
|
|
|
|
|
obsolete = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
2014-04-07 15:45:46 +00:00
|
|
|
|
res.push_back(*it);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
(*it)->remove_time = time(0);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
removePartContributionToColumnSizes(*it);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
data_parts.erase(it++);
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-27 12:08:40 +00:00
|
|
|
|
if (obsolete)
|
|
|
|
|
{
|
|
|
|
|
LOG_WARNING(log, "Obsolete part " + part->name + " added");
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
data_parts.insert(part);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
addPartContributionToColumnSizes(part);
|
2014-05-27 12:08:40 +00:00
|
|
|
|
}
|
2014-06-12 18:41:09 +00:00
|
|
|
|
|
2014-03-13 17:44:00 +00:00
|
|
|
|
all_data_parts.insert(part);
|
2014-04-07 15:45:46 +00:00
|
|
|
|
|
2014-07-01 15:58:25 +00:00
|
|
|
|
if (out_transaction)
|
|
|
|
|
{
|
|
|
|
|
out_transaction->data = this;
|
|
|
|
|
out_transaction->added_parts = res;
|
|
|
|
|
out_transaction->removed_parts = DataPartsVector(1, part);
|
|
|
|
|
}
|
|
|
|
|
|
2014-04-07 15:45:46 +00:00
|
|
|
|
return res;
|
2014-03-13 17:44:00 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-07-07 10:23:24 +00:00
|
|
|
|
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
|
2014-07-01 15:58:25 +00:00
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
|
|
|
|
|
for (const DataPartPtr & part : remove)
|
|
|
|
|
{
|
2014-07-07 10:23:24 +00:00
|
|
|
|
part->remove_time = clear_without_timeout ? 0 : time(0);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
removePartContributionToColumnSizes(part);
|
2014-07-01 15:58:25 +00:00
|
|
|
|
data_parts.erase(part);
|
|
|
|
|
}
|
|
|
|
|
for (const DataPartPtr & part : add)
|
|
|
|
|
{
|
|
|
|
|
data_parts.insert(part);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
addPartContributionToColumnSizes(part);
|
2014-07-01 15:58:25 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
void MergeTreeData::attachPart(const DataPartPtr & part)
|
2014-08-08 08:28:13 +00:00
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
|
|
|
|
|
|
|
|
|
if (!all_data_parts.insert(part).second)
|
|
|
|
|
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
|
|
|
|
|
data_parts.insert(part);
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
|
2014-04-02 07:59:43 +00:00
|
|
|
|
{
|
2014-07-28 09:46:28 +00:00
|
|
|
|
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
|
|
|
|
|
|
2014-04-02 07:59:43 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
|
2014-07-28 09:46:28 +00:00
|
|
|
|
|
2014-04-09 16:32:32 +00:00
|
|
|
|
if (!all_data_parts.erase(part))
|
2014-04-02 07:59:43 +00:00
|
|
|
|
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
|
2014-07-28 09:46:28 +00:00
|
|
|
|
|
2014-09-19 11:44:29 +00:00
|
|
|
|
removePartContributionToColumnSizes(part);
|
2014-04-09 16:32:32 +00:00
|
|
|
|
data_parts.erase(part);
|
2014-08-08 08:28:13 +00:00
|
|
|
|
if (move_to_detached || !prefix.empty())
|
2014-12-24 20:02:04 +00:00
|
|
|
|
part->renameAddPrefix(move_to_detached, prefix);
|
2014-07-28 09:46:28 +00:00
|
|
|
|
|
|
|
|
|
if (restore_covered)
|
|
|
|
|
{
|
|
|
|
|
auto it = all_data_parts.lower_bound(part);
|
|
|
|
|
Strings restored;
|
|
|
|
|
bool error = false;
|
|
|
|
|
|
|
|
|
|
UInt64 pos = part->left;
|
|
|
|
|
|
|
|
|
|
if (it != all_data_parts.begin())
|
|
|
|
|
{
|
|
|
|
|
--it;
|
|
|
|
|
if (part->contains(**it))
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->left != part->left)
|
|
|
|
|
error = true;
|
|
|
|
|
data_parts.insert(*it);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
addPartContributionToColumnSizes(*it);
|
2014-07-28 09:46:28 +00:00
|
|
|
|
pos = (*it)->right + 1;
|
|
|
|
|
restored.push_back((*it)->name);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
error = true;
|
|
|
|
|
++it;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
error = true;
|
|
|
|
|
|
|
|
|
|
for (; it != all_data_parts.end() && part->contains(**it); ++it)
|
|
|
|
|
{
|
|
|
|
|
if ((*it)->left < pos)
|
|
|
|
|
continue;
|
|
|
|
|
if ((*it)->left > pos)
|
|
|
|
|
error = true;
|
|
|
|
|
data_parts.insert(*it);
|
2014-09-19 11:44:29 +00:00
|
|
|
|
addPartContributionToColumnSizes(*it);
|
2014-07-28 09:46:28 +00:00
|
|
|
|
pos = (*it)->right + 1;
|
|
|
|
|
restored.push_back((*it)->name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pos != part->right + 1)
|
|
|
|
|
error = true;
|
|
|
|
|
|
|
|
|
|
for (const String & name : restored)
|
|
|
|
|
{
|
|
|
|
|
LOG_INFO(log, "Activated part " << name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (error)
|
|
|
|
|
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
|
|
|
|
|
}
|
2014-03-13 17:44:00 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-09-29 20:26:46 +00:00
|
|
|
|
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
|
2014-08-08 08:28:13 +00:00
|
|
|
|
{
|
|
|
|
|
renameAndDetachPart(part, "", false, false);
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
MergeTreeData::DataParts MergeTreeData::getDataParts()
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
|
|
|
|
|
return data_parts;
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-19 11:44:29 +00:00
|
|
|
|
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector()
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
|
2014-09-26 16:11:30 +00:00
|
|
|
|
return DataPartsVector(std::begin(data_parts), std::end(data_parts));
|
2014-09-19 11:44:29 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-04-09 16:32:32 +00:00
|
|
|
|
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
|
|
|
|
|
|
|
|
|
|
return all_data_parts;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-16 15:55:57 +00:00
|
|
|
|
size_t MergeTreeData::getMaxPartsCountForMonth()
|
2014-04-11 16:56:49 +00:00
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
|
|
|
|
|
2014-05-16 15:55:57 +00:00
|
|
|
|
size_t res = 0;
|
|
|
|
|
size_t cur_count = 0;
|
|
|
|
|
DayNum_t cur_month = DayNum_t(0);
|
|
|
|
|
|
|
|
|
|
for (const auto & part : data_parts)
|
|
|
|
|
{
|
|
|
|
|
if (part->left_month == cur_month)
|
|
|
|
|
{
|
|
|
|
|
++cur_count;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
cur_month = part->left_month;
|
|
|
|
|
cur_count = 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
res = std::max(res, cur_count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
2014-04-11 16:56:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-09-03 02:32:23 +00:00
|
|
|
|
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
|
2014-05-27 08:43:01 +00:00
|
|
|
|
{
|
|
|
|
|
size_t parts_count = getMaxPartsCountForMonth();
|
|
|
|
|
if (parts_count > settings.parts_to_delay_insert)
|
|
|
|
|
{
|
|
|
|
|
double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
|
|
|
|
|
delay /= 1000;
|
2014-09-12 20:29:29 +00:00
|
|
|
|
|
|
|
|
|
if (delay > DBMS_MAX_DELAY_OF_INSERT)
|
2014-09-17 02:36:32 +00:00
|
|
|
|
{
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::RejectedInserts);
|
|
|
|
|
throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
|
|
|
|
|
}
|
2014-09-13 18:34:08 +00:00
|
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DelayedInserts);
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000);
|
2014-06-20 18:45:19 +00:00
|
|
|
|
|
2014-05-27 08:43:01 +00:00
|
|
|
|
LOG_INFO(log, "Delaying inserting block by "
|
2014-09-03 02:32:23 +00:00
|
|
|
|
<< std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts");
|
|
|
|
|
|
|
|
|
|
if (until)
|
|
|
|
|
until->tryWait(delay * 1000);
|
|
|
|
|
else
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::duration<double>(delay));
|
2014-05-27 08:43:01 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-25 11:38:46 +00:00
|
|
|
|
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
|
2014-04-03 11:48:28 +00:00
|
|
|
|
{
|
|
|
|
|
MutableDataPartPtr tmp_part(new DataPart(*this));
|
2014-05-26 10:41:40 +00:00
|
|
|
|
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
|
2014-04-03 11:48:28 +00:00
|
|
|
|
|
2014-05-27 10:03:13 +00:00
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
|
2014-04-09 15:52:47 +00:00
|
|
|
|
|
2014-04-03 11:48:28 +00:00
|
|
|
|
/// Кусок может покрываться только предыдущим или следующим в data_parts.
|
2014-05-27 10:03:13 +00:00
|
|
|
|
DataParts::iterator it = data_parts.lower_bound(tmp_part);
|
2014-04-03 11:48:28 +00:00
|
|
|
|
|
2014-05-27 10:03:13 +00:00
|
|
|
|
if (it != data_parts.end())
|
2014-04-03 11:48:28 +00:00
|
|
|
|
{
|
|
|
|
|
if ((*it)->name == part_name)
|
|
|
|
|
return *it;
|
|
|
|
|
if ((*it)->contains(*tmp_part))
|
|
|
|
|
return *it;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-27 10:03:13 +00:00
|
|
|
|
if (it != data_parts.begin())
|
2014-04-03 11:48:28 +00:00
|
|
|
|
{
|
|
|
|
|
--it;
|
|
|
|
|
if ((*it)->contains(*tmp_part))
|
|
|
|
|
return *it;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
2014-07-25 11:38:46 +00:00
|
|
|
|
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
|
|
|
|
|
{
|
|
|
|
|
MutableDataPartPtr tmp_part(new DataPart(*this));
|
|
|
|
|
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
|
|
|
|
|
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
|
|
|
|
|
DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
|
|
|
|
|
if (it != all_data_parts.end() && (*it)->name == part_name)
|
|
|
|
|
return *it;
|
|
|
|
|
|
|
|
|
|
return nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
2014-08-08 08:28:13 +00:00
|
|
|
|
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
|
|
|
|
|
{
|
|
|
|
|
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
|
|
|
|
|
part->name = relative_path;
|
2014-10-03 19:38:51 +00:00
|
|
|
|
ActiveDataPartSet::parsePartName(Poco::Path(relative_path).getFileName(), *part);
|
2014-08-08 08:28:13 +00:00
|
|
|
|
|
|
|
|
|
/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
|
|
|
|
|
if (Poco::File(full_path + relative_path + "/columns.txt").exists())
|
|
|
|
|
Poco::File(full_path + relative_path + "/columns.txt").remove();
|
|
|
|
|
|
|
|
|
|
part->loadColumns(false);
|
|
|
|
|
part->loadChecksums(false);
|
|
|
|
|
part->loadIndex();
|
|
|
|
|
part->checkNotBroken(false);
|
|
|
|
|
|
|
|
|
|
part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();
|
|
|
|
|
|
|
|
|
|
/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
|
|
|
|
|
if (part->checksums.empty())
|
|
|
|
|
{
|
|
|
|
|
MergeTreePartChecker::Settings settings;
|
|
|
|
|
settings.setIndexGranularity(index_granularity);
|
|
|
|
|
settings.setRequireColumnFiles(true);
|
|
|
|
|
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
|
2014-12-17 18:37:23 +00:00
|
|
|
|
part->checksums.write(out);
|
2014-08-08 08:28:13 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return part;
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-27 11:29:40 +00:00
|
|
|
|
|
2014-04-14 13:08:26 +00:00
|
|
|
|
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
|
|
|
|
|
{
|
|
|
|
|
if (is_compressed && have_uncompressed)
|
|
|
|
|
{
|
|
|
|
|
if (!rhs.is_compressed)
|
|
|
|
|
throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
|
|
|
|
if (rhs.uncompressed_size != uncompressed_size)
|
|
|
|
|
throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
|
|
|
|
|
if (rhs.uncompressed_hash != uncompressed_hash)
|
|
|
|
|
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (rhs.file_size != file_size)
|
|
|
|
|
throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
|
|
|
|
|
if (rhs.file_hash != file_hash)
|
|
|
|
|
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
|
|
|
|
|
{
|
|
|
|
|
Poco::File file(path);
|
|
|
|
|
if (!file.exists())
|
|
|
|
|
throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
|
|
|
|
|
size_t size = file.getSize();
|
|
|
|
|
if (size != file_size)
|
|
|
|
|
throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
|
|
|
|
|
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
2014-03-27 17:30:04 +00:00
|
|
|
|
for (const auto & it : rhs.files)
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
|
|
|
|
const String & name = it.first;
|
|
|
|
|
|
2014-03-27 17:30:04 +00:00
|
|
|
|
if (!files.count(name))
|
2014-03-27 11:29:40 +00:00
|
|
|
|
throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-27 17:30:04 +00:00
|
|
|
|
for (const auto & it : files)
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
|
|
|
|
const String & name = it.first;
|
|
|
|
|
|
2014-03-27 17:30:04 +00:00
|
|
|
|
auto jt = rhs.files.find(name);
|
|
|
|
|
if (jt == rhs.files.end())
|
2014-03-27 11:29:40 +00:00
|
|
|
|
throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);
|
|
|
|
|
|
2014-04-14 13:08:26 +00:00
|
|
|
|
it.second.checkEqual(jt->second, have_uncompressed, name);
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-04-14 13:13:20 +00:00
|
|
|
|
|
2014-04-14 13:08:26 +00:00
|
|
|
|
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
|
|
|
|
|
{
|
|
|
|
|
for (const auto & it : files)
|
|
|
|
|
{
|
|
|
|
|
const String & name = it.first;
|
|
|
|
|
it.second.checkSize(path + name);
|
2014-03-27 11:29:40 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-17 18:37:23 +00:00
|
|
|
|
bool MergeTreeData::DataPart::Checksums::read(ReadBuffer & in)
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
2014-03-27 17:30:04 +00:00
|
|
|
|
files.clear();
|
2014-03-27 11:29:40 +00:00
|
|
|
|
|
2014-04-14 13:08:26 +00:00
|
|
|
|
DB::assertString("checksums format version: ", in);
|
|
|
|
|
int format_version;
|
|
|
|
|
DB::readText(format_version, in);
|
2014-12-17 18:37:23 +00:00
|
|
|
|
DB::assertString("\n", in);
|
|
|
|
|
|
|
|
|
|
if (format_version < 1 || format_version > 4)
|
2014-04-14 13:08:26 +00:00
|
|
|
|
throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
|
2014-12-17 18:37:23 +00:00
|
|
|
|
|
2014-04-18 09:55:21 +00:00
|
|
|
|
if (format_version == 1)
|
|
|
|
|
return false;
|
2014-12-09 13:58:10 +00:00
|
|
|
|
if (format_version == 2)
|
2014-12-17 18:37:23 +00:00
|
|
|
|
return read_v2(in);
|
2014-12-09 13:58:10 +00:00
|
|
|
|
if (format_version == 3)
|
2014-12-17 18:37:23 +00:00
|
|
|
|
return read_v3(in);
|
|
|
|
|
if (format_version == 4)
|
|
|
|
|
return read_v4(in);
|
2014-12-09 13:58:10 +00:00
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-17 18:37:23 +00:00
|
|
|
|
bool MergeTreeData::DataPart::Checksums::read_v2(ReadBuffer & in)
|
2014-12-09 13:58:10 +00:00
|
|
|
|
{
|
|
|
|
|
size_t count;
|
|
|
|
|
|
2014-03-27 11:29:40 +00:00
|
|
|
|
DB::readText(count, in);
|
|
|
|
|
DB::assertString(" files:\n", in);
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < count; ++i)
|
|
|
|
|
{
|
|
|
|
|
String name;
|
|
|
|
|
Checksum sum;
|
|
|
|
|
|
|
|
|
|
DB::readString(name, in);
|
|
|
|
|
DB::assertString("\n\tsize: ", in);
|
2014-04-14 13:08:26 +00:00
|
|
|
|
DB::readText(sum.file_size, in);
|
2014-03-27 11:29:40 +00:00
|
|
|
|
DB::assertString("\n\thash: ", in);
|
2014-04-14 13:08:26 +00:00
|
|
|
|
DB::readText(sum.file_hash.first, in);
|
2014-03-27 11:29:40 +00:00
|
|
|
|
DB::assertString(" ", in);
|
2014-04-14 13:08:26 +00:00
|
|
|
|
DB::readText(sum.file_hash.second, in);
|
2014-04-18 09:55:21 +00:00
|
|
|
|
DB::assertString("\n\tcompressed: ", in);
|
|
|
|
|
DB::readText(sum.is_compressed, in);
|
|
|
|
|
if (sum.is_compressed)
|
2014-04-14 13:08:26 +00:00
|
|
|
|
{
|
2014-04-18 09:55:21 +00:00
|
|
|
|
DB::assertString("\n\tuncompressed size: ", in);
|
|
|
|
|
DB::readText(sum.uncompressed_size, in);
|
|
|
|
|
DB::assertString("\n\tuncompressed hash: ", in);
|
|
|
|
|
DB::readText(sum.uncompressed_hash.first, in);
|
|
|
|
|
DB::assertString(" ", in);
|
|
|
|
|
DB::readText(sum.uncompressed_hash.second, in);
|
2014-04-14 13:08:26 +00:00
|
|
|
|
}
|
2014-04-18 09:55:21 +00:00
|
|
|
|
DB::assertString("\n", in);
|
2014-03-27 11:29:40 +00:00
|
|
|
|
|
2014-03-27 17:30:04 +00:00
|
|
|
|
files.insert(std::make_pair(name, sum));
|
2014-03-27 11:29:40 +00:00
|
|
|
|
}
|
2014-04-18 09:55:21 +00:00
|
|
|
|
|
|
|
|
|
return true;
|
2014-03-27 11:29:40 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-12-17 18:37:23 +00:00
|
|
|
|
bool MergeTreeData::DataPart::Checksums::read_v3(ReadBuffer & in)
|
2014-12-09 13:58:10 +00:00
|
|
|
|
{
|
|
|
|
|
size_t count;
|
|
|
|
|
|
|
|
|
|
DB::readVarUInt(count, in);
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < count; ++i)
|
|
|
|
|
{
|
|
|
|
|
String name;
|
|
|
|
|
Checksum sum;
|
|
|
|
|
|
|
|
|
|
DB::readBinary(name, in);
|
|
|
|
|
DB::readVarUInt(sum.file_size, in);
|
|
|
|
|
DB::readBinary(sum.file_hash, in);
|
|
|
|
|
DB::readBinary(sum.is_compressed, in);
|
|
|
|
|
|
|
|
|
|
if (sum.is_compressed)
|
|
|
|
|
{
|
|
|
|
|
DB::readVarUInt(sum.uncompressed_size, in);
|
|
|
|
|
DB::readBinary(sum.uncompressed_hash, in);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
files.emplace(std::move(name), sum);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-17 18:37:23 +00:00
|
|
|
|
bool MergeTreeData::DataPart::Checksums::read_v4(ReadBuffer & from)
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
2014-12-17 18:37:23 +00:00
|
|
|
|
CompressedReadBuffer in{from};
|
|
|
|
|
return read_v3(in);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::DataPart::Checksums::write(WriteBuffer & to) const
|
|
|
|
|
{
|
|
|
|
|
DB::writeString("checksums format version: 4\n", to);
|
|
|
|
|
|
|
|
|
|
DB::CompressedWriteBuffer out{to, CompressionMethod::LZ4, 1 << 16};
|
2014-12-09 13:58:10 +00:00
|
|
|
|
DB::writeVarUInt(files.size(), out);
|
2014-03-27 11:29:40 +00:00
|
|
|
|
|
2014-03-27 17:30:04 +00:00
|
|
|
|
for (const auto & it : files)
|
2014-03-27 11:29:40 +00:00
|
|
|
|
{
|
2014-04-14 13:08:26 +00:00
|
|
|
|
const String & name = it.first;
|
|
|
|
|
const Checksum & sum = it.second;
|
2014-12-09 13:58:10 +00:00
|
|
|
|
|
|
|
|
|
DB::writeBinary(name, out);
|
|
|
|
|
DB::writeVarUInt(sum.file_size, out);
|
|
|
|
|
DB::writeBinary(sum.file_hash, out);
|
|
|
|
|
DB::writeBinary(sum.is_compressed, out);
|
|
|
|
|
|
2014-04-14 13:08:26 +00:00
|
|
|
|
if (sum.is_compressed)
|
|
|
|
|
{
|
2014-12-09 13:58:10 +00:00
|
|
|
|
DB::writeVarUInt(sum.uncompressed_size, out);
|
|
|
|
|
DB::writeBinary(sum.uncompressed_hash, out);
|
2014-04-14 13:08:26 +00:00
|
|
|
|
}
|
2014-03-27 11:29:40 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-19 11:44:29 +00:00
|
|
|
|
void MergeTreeData::calculateColumnSizes()
|
|
|
|
|
{
|
|
|
|
|
column_sizes.clear();
|
|
|
|
|
|
|
|
|
|
for (const auto & part : data_parts)
|
|
|
|
|
addPartContributionToColumnSizes(part);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
|
|
|
|
|
{
|
|
|
|
|
const auto & files = part->checksums.files;
|
|
|
|
|
|
|
|
|
|
for (const auto & column : *columns)
|
|
|
|
|
{
|
|
|
|
|
const auto escaped_name = escapeForFileName(column.name);
|
|
|
|
|
const auto bin_file_name = escaped_name + ".bin";
|
|
|
|
|
const auto mrk_file_name = escaped_name + ".mrk";
|
|
|
|
|
|
|
|
|
|
auto & column_size = column_sizes[column.name];
|
|
|
|
|
|
|
|
|
|
if (files.count(bin_file_name)) column_size += files.find(bin_file_name)->second.file_size;
|
|
|
|
|
if (files.count(mrk_file_name)) column_size += files.find(mrk_file_name)->second.file_size;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
|
|
|
|
|
{
|
|
|
|
|
const auto & files = part->checksums.files;
|
|
|
|
|
|
|
|
|
|
for (const auto & column : *columns)
|
|
|
|
|
{
|
|
|
|
|
const auto escaped_name = escapeForFileName(column.name);
|
|
|
|
|
const auto bin_file_name = escaped_name + ".bin";
|
|
|
|
|
const auto mrk_file_name = escaped_name + ".mrk";
|
|
|
|
|
|
|
|
|
|
auto & column_size = column_sizes[column.name];
|
|
|
|
|
|
|
|
|
|
if (files.count(bin_file_name)) column_size -= files.find(bin_file_name)->second.file_size;
|
|
|
|
|
if (files.count(mrk_file_name)) column_size -= files.find(mrk_file_name)->second.file_size;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-03 17:57:01 +00:00
|
|
|
|
|
2014-11-11 04:11:07 +00:00
|
|
|
|
void MergeTreeData::freezePartition(const std::string & prefix)
|
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Freezing parts with prefix " + prefix);
|
|
|
|
|
|
|
|
|
|
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
|
|
|
|
|
String shadow_path = clickhouse_path + "shadow/";
|
|
|
|
|
Poco::File(shadow_path).createDirectories();
|
|
|
|
|
String backup_path = shadow_path + toString(Increment(shadow_path + "increment.txt").get(true)) + "/";
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);
|
|
|
|
|
|
|
|
|
|
size_t parts_processed = 0;
|
|
|
|
|
Poco::DirectoryIterator end;
|
|
|
|
|
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
|
|
|
|
|
{
|
|
|
|
|
if (0 == it.name().compare(0, prefix.size(), prefix))
|
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Freezing part " + it.name());
|
|
|
|
|
|
|
|
|
|
String part_absolute_path = it.path().absolute().toString();
|
|
|
|
|
if (0 != part_absolute_path.compare(0, clickhouse_path.size(), clickhouse_path))
|
|
|
|
|
throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
|
|
String backup_part_absolute_path = part_absolute_path;
|
|
|
|
|
backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
|
|
|
|
|
localBackup(part_absolute_path, backup_part_absolute_path);
|
|
|
|
|
++parts_processed;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-10-03 17:57:01 +00:00
|
|
|
|
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
|
|
|
|
|
{
|
|
|
|
|
String month_name = partition.getType() == Field::Types::UInt64
|
|
|
|
|
? toString(partition.get<UInt64>())
|
|
|
|
|
: partition.safeGet<String>();
|
|
|
|
|
|
|
|
|
|
if (month_name.size() != 6 || !std::all_of(month_name.begin(), month_name.end(), isdigit))
|
|
|
|
|
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
|
|
|
|
|
ErrorCodes::INVALID_PARTITION_NAME);
|
|
|
|
|
|
2014-11-06 06:32:23 +00:00
|
|
|
|
DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
|
2014-10-03 17:57:01 +00:00
|
|
|
|
|
|
|
|
|
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
|
2014-11-06 06:32:23 +00:00
|
|
|
|
if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
|
2014-10-03 17:57:01 +00:00
|
|
|
|
throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
|
|
|
|
|
ErrorCodes::INVALID_PARTITION_NAME);
|
|
|
|
|
|
|
|
|
|
return std::make_pair(month_name, date);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String MergeTreeData::getMonthName(const Field & partition)
|
|
|
|
|
{
|
|
|
|
|
return getMonthNameAndDayNum(partition).first;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DayNum_t MergeTreeData::getMonthDayNum(const Field & partition)
|
|
|
|
|
{
|
|
|
|
|
return getMonthNameAndDayNum(partition).second;
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
}
|