ClickHouse/dbms/src/Storages/MergeTree/MergeTreeData.cpp

1630 lines
52 KiB
C++
Raw Normal View History

#include <DB/Storages/MergeTree/MergeTreeData.h>
2014-03-09 17:36:01 +00:00
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
2014-08-08 08:28:13 +00:00
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
2014-03-14 07:05:43 +00:00
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
2014-03-27 17:30:04 +00:00
#include <DB/DataStreams/copyData.h>
2014-07-09 13:39:19 +00:00
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
2016-03-25 11:48:45 +00:00
#include <DB/IO/HexWriteBuffer.h>
2014-09-23 19:47:25 +00:00
#include <DB/DataTypes/DataTypeDate.h>
2016-04-15 19:37:19 +00:00
#include <DB/DataTypes/DataTypeDateTime.h>
2015-04-01 11:44:04 +00:00
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/Common/localBackup.h>
2014-12-17 15:26:24 +00:00
#include <DB/Functions/FunctionFactory.h>
2015-04-16 06:12:35 +00:00
#include <Poco/DirectoryIterator.h>
2015-10-05 01:11:12 +00:00
#include <DB/Common/Increment.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/StringUtils.h>
#include <DB/IO/Operators.h>
2014-06-10 14:24:33 +00:00
#include <algorithm>
#include <iomanip>
#include <thread>
2014-03-09 17:36:01 +00:00
namespace ProfileEvents
{
extern const Event RejectedInserts;
extern const Event DelayedInserts;
extern const Event DelayedInsertsMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric DelayedInserts;
}
2014-03-09 17:36:01 +00:00
namespace DB
{
2016-11-20 12:43:20 +00:00
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
2014-03-09 17:36:01 +00:00
MergeTreeData::MergeTreeData(
2014-03-13 12:48:07 +00:00
const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
2016-01-28 01:00:27 +00:00
Context & context_,
2014-03-09 17:36:01 +00:00
ASTPtr & primary_expr_ast_,
const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_,
2016-04-15 17:13:51 +00:00
const MergingParams & merging_params_,
2014-05-08 07:12:01 +00:00
const MergeTreeSettings & settings_,
2014-07-09 13:39:19 +00:00
const String & log_name_,
2014-07-23 09:15:41 +00:00
bool require_part_metadata_,
BrokenPartCallback broken_part_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
2014-03-09 17:36:01 +00:00
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
2016-04-15 17:42:51 +00:00
merging_params(merging_params_),
2015-03-13 21:31:23 +00:00
settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
2014-07-09 13:39:19 +00:00
require_part_metadata(require_part_metadata_),
2014-07-23 09:15:41 +00:00
full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
2014-03-09 17:36:01 +00:00
{
2014-09-23 19:47:25 +00:00
/// Проверяем, что столбец с датой существует и имеет тип Date.
2014-11-22 02:22:30 +00:00
const auto check_date_exists = [this] (const NamesAndTypesList & columns)
{
2014-11-11 16:29:21 +00:00
for (const auto & column : columns)
2014-09-23 19:47:25 +00:00
{
2014-11-11 16:29:21 +00:00
if (column.name == date_column_name)
2014-09-23 19:47:25 +00:00
{
2014-11-11 16:29:21 +00:00
if (!typeid_cast<const DataTypeDate *>(column.type.get()))
2014-09-23 19:47:25 +00:00
throw Exception("Date column (" + date_column_name + ") for storage of MergeTree family must have type Date."
2014-11-11 16:29:21 +00:00
" Provided column of type " + column.type->getName() + "."
" You may have separate column with type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
return true;
2014-09-23 19:47:25 +00:00
}
}
2014-11-11 16:29:21 +00:00
return false;
};
if (!check_date_exists(*columns) && !check_date_exists(materialized_columns))
throw Exception{
"Date column (" + date_column_name + ") does not exist in table declaration.",
2014-11-22 02:22:30 +00:00
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
2016-04-15 17:42:51 +00:00
merging_params.check(*columns);
2014-09-23 19:47:25 +00:00
/// Creating directories, if not exist.
2014-03-09 17:36:01 +00:00
Poco::File(full_path).createDirectories();
2014-08-07 09:23:55 +00:00
Poco::File(full_path + "detached").createDirectory();
2014-03-09 17:36:01 +00:00
2015-03-13 21:31:23 +00:00
if (primary_expr_ast)
initPrimaryKey();
else if (merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
}
void MergeTreeData::initPrimaryKey()
{
/// Initialize description of sorting.
sort_descr.clear();
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
2014-03-09 17:36:01 +00:00
{
String name = ast->getColumnName();
sort_descr.emplace_back(name, 1);
}
2014-03-09 17:36:01 +00:00
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(false);
2014-03-09 17:36:01 +00:00
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
size_t primary_key_size = primary_key_sample.columns();
/// A primary key cannot contain constants. It is meaningless.
/// (And also couldn't work because primary key is serialized with method of IDataType that doesn't support constants).
/// Also a primary key must not contain any nullable column.
for (size_t i = 0; i < primary_key_size; ++i)
{
const auto & element = primary_key_sample.unsafeGetByPosition(i);
const ColumnPtr & column = element.column;
if (column && column->isConst())
throw Exception{"Primary key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN};
if (element.type->isNullable())
throw Exception{"Primary key cannot contain nullable columns", ErrorCodes::ILLEGAL_COLUMN};
}
primary_key_data_types.resize(primary_key_size);
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type;
2014-03-13 12:48:07 +00:00
}
2014-03-09 17:36:01 +00:00
2016-04-15 17:42:51 +00:00
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
/// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8.
if (mode == MergingParams::Collapsing)
{
if (sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
for (const auto & column : columns)
{
if (column.name == sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
}
}
else if (!sign_column.empty())
throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR);
/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
if (!columns_to_sum.empty())
{
if (mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & column_to_sum : columns_to_sum)
if (columns.end() == std::find_if(columns.begin(), columns.end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
/// Проверяем, что столбец version_column, если допустим, имеет тип целого беззнакового числа.
if (!version_column.empty())
{
if (mode != MergingParams::Replacing)
throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & column : columns)
{
if (column.name == version_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
2016-04-15 19:37:19 +00:00
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get())
&& !typeid_cast<const DataTypeDate *>(column.type.get())
&& !typeid_cast<const DataTypeDateTime *>(column.type.get()))
2016-04-15 17:42:51 +00:00
throw Exception("Version column (" + version_column + ")"
2016-04-15 19:37:19 +00:00
" for storage ReplacingMergeTree must have type of UInt family or Date or DateTime."
2016-04-15 17:42:51 +00:00
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
}
}
2016-04-24 09:44:47 +00:00
/// TODO Проверки для Graphite
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)
{
case Ordinary: return "";
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
case Unsorted: return "Unsorted";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
}
2016-04-15 17:42:51 +00:00
}
2015-08-17 21:09:36 +00:00
Int64 MergeTreeData::getMaxDataPartIndex()
2014-03-13 12:48:07 +00:00
{
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
2015-08-17 21:09:36 +00:00
Int64 max_part_id = 0;
for (const auto & part : all_data_parts)
2014-09-29 05:03:03 +00:00
max_part_id = std::max(max_part_id, part->right);
2014-03-13 12:48:07 +00:00
return max_part_id;
2014-03-09 17:36:01 +00:00
}
2014-08-13 08:07:52 +00:00
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
2014-03-09 17:36:01 +00:00
{
LOG_DEBUG(log, "Loading data parts");
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
2014-03-09 17:36:01 +00:00
data_parts.clear();
all_data_parts.clear();
2014-03-09 17:36:01 +00:00
Strings part_file_names;
2014-03-09 17:36:01 +00:00
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
2014-04-09 15:52:47 +00:00
{
/// Пропускаем временные директории старше суток.
if (startsWith(it.name(), "tmp_"))
2014-03-09 17:36:01 +00:00
continue;
part_file_names.push_back(it.name());
2014-03-09 17:36:01 +00:00
}
2014-04-18 11:05:30 +00:00
DataPartsVector broken_parts_to_remove;
2014-08-13 08:07:52 +00:00
DataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;
2014-04-18 11:05:30 +00:00
2014-03-09 17:36:01 +00:00
Poco::RegularExpression::MatchVec matches;
2014-04-09 15:52:47 +00:00
for (const String & file_name : part_file_names)
2014-03-09 17:36:01 +00:00
{
2014-08-08 08:28:13 +00:00
if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
2014-03-09 17:36:01 +00:00
continue;
2014-03-14 17:19:38 +00:00
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
ActiveDataPartSet::parsePartName(file_name, *part, &matches);
2014-03-09 17:36:01 +00:00
part->name = file_name;
2014-04-18 11:05:30 +00:00
bool broken = false;
try
{
2014-08-08 08:28:13 +00:00
part->loadColumns(require_part_metadata);
part->loadChecksums(require_part_metadata);
2014-07-09 13:39:19 +00:00
part->loadIndex();
2014-08-08 08:28:13 +00:00
part->checkNotBroken(require_part_metadata);
2014-04-18 11:05:30 +00:00
}
catch (const Exception & e)
{
/** Если не хватает памяти для загрузки куска - не нужно считать его битым.
* На самом деле, похожих ситуаций может быть ещё много.
* Но это не страшно, так как ниже есть защита от слишком большого количества кусков для удаления.
*/
if (e.code() == ErrorCodes::MEMORY_LIMIT_EXCEEDED)
throw;
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2014-04-18 11:05:30 +00:00
catch (...)
{
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2014-04-09 15:52:47 +00:00
/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
2014-04-18 11:05:30 +00:00
if (broken)
2014-03-09 17:36:01 +00:00
{
if (part->level == 0)
{
/// Восстановить куски нулевого уровня невозможно.
LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it's impossible to repair.");
2014-04-18 11:05:30 +00:00
broken_parts_to_remove.push_back(part);
2014-03-09 17:36:01 +00:00
}
else
{
2014-04-09 15:52:47 +00:00
/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
/// слиянием, и мы ничего не потеряем, если его удалим.
int contained_parts = 0;
LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");
2014-08-13 08:07:52 +00:00
++suspicious_broken_parts;
2014-04-09 15:52:47 +00:00
for (const String & contained_name : part_file_names)
{
if (contained_name == file_name)
continue;
2014-08-08 08:28:13 +00:00
if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
2014-04-09 15:52:47 +00:00
continue;
DataPart contained_part(*this);
ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
2014-04-09 15:52:47 +00:00
if (part->contains(contained_part))
{
LOG_ERROR(log, "Found part " << full_path + contained_name);
++contained_parts;
}
}
if (contained_parts >= 2)
{
LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it covers at least 2 other parts");
2014-04-18 11:05:30 +00:00
broken_parts_to_remove.push_back(part);
2014-04-09 15:52:47 +00:00
}
else
{
2014-08-13 08:07:52 +00:00
LOG_ERROR(log, "Detaching broken part " << full_path + file_name
2014-04-09 15:52:47 +00:00
<< " because it covers less than 2 parts. You need to resolve this manually");
2014-08-13 08:07:52 +00:00
broken_parts_to_detach.push_back(part);
2014-04-09 15:52:47 +00:00
}
2014-03-09 17:36:01 +00:00
}
continue;
}
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
data_parts.insert(part);
}
2015-07-16 21:03:53 +00:00
if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks)
2014-08-13 08:07:52 +00:00
throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
2014-04-18 11:05:30 +00:00
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
for (const auto & part : broken_parts_to_remove)
part->remove();
2014-08-13 08:07:52 +00:00
for (const auto & part : broken_parts_to_detach)
part->renameAddPrefix(true, "");
2014-04-18 11:05:30 +00:00
2014-03-09 17:36:01 +00:00
all_data_parts = data_parts;
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* но по каким-то причинам остались лежать в файловой системе.
* Удаление файлов будет произведено потом в методе clearOldParts.
*/
if (data_parts.size() >= 2)
{
DataParts::iterator prev_jt = data_parts.begin();
DataParts::iterator curr_jt = prev_jt;
++curr_jt;
while (curr_jt != data_parts.end())
{
/// Куски данных за разные месяцы рассматривать не будем
2015-08-17 21:09:36 +00:00
if ((*curr_jt)->month != (*prev_jt)->month)
2014-03-09 17:36:01 +00:00
{
++prev_jt;
++curr_jt;
continue;
}
if ((*curr_jt)->contains(**prev_jt))
{
(*prev_jt)->remove_time = (*prev_jt)->modification_time;
2014-03-09 17:36:01 +00:00
data_parts.erase(prev_jt);
prev_jt = curr_jt;
++curr_jt;
}
else if ((*prev_jt)->contains(**curr_jt))
{
(*curr_jt)->remove_time = (*curr_jt)->modification_time;
2014-03-09 17:36:01 +00:00
data_parts.erase(curr_jt++);
}
else
{
++prev_jt;
++curr_jt;
}
}
}
2014-09-19 11:44:29 +00:00
calculateColumnSizes();
2014-03-09 17:36:01 +00:00
LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}
/** Является ли директория куска старой.
* Это так, если её дата модификации,
* и одновременно дата модификации всех файлов внутри неё
* (рассматриваются файлы только на одном уровне вложенности),
* меньше threshold.
*/
static bool isOldPartDirectory(Poco::File & directory, time_t threshold)
{
if (directory.getLastModified().epochTime() >= threshold)
return false;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(directory); it != end; ++it)
if (it->getLastModified().epochTime() >= threshold)
return false;
return true;
}
void MergeTreeData::clearOldTemporaryDirectories()
2014-03-09 17:36:01 +00:00
{
2016-04-18 21:38:06 +00:00
/// Если метод уже вызван из другого потока, то можно ничего не делать.
std::unique_lock<std::mutex> lock(clear_old_temporary_directories_mutex, std::defer_lock);
if (!lock.try_lock())
return;
time_t current_time = time(0);
2014-04-18 10:05:38 +00:00
/// Удаляем временные директории старше суток.
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{full_path}; it != end; ++it)
2014-04-18 10:05:38 +00:00
{
if (startsWith(it.name(), "tmp_"))
2014-04-18 10:05:38 +00:00
{
Poco::File tmp_dir(full_path + it.name());
2014-04-18 10:05:38 +00:00
2016-04-18 21:38:06 +00:00
try
{
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, current_time - settings.temporary_directories_lifetime))
2016-04-18 21:38:06 +00:00
{
LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
Poco::File(full_path + it.name()).remove(true);
}
}
catch (const Poco::FileNotFoundException &)
2014-04-18 10:05:38 +00:00
{
2016-04-18 21:38:06 +00:00
/// Ничего не делаем, если файл уже удалён.
2014-04-18 10:05:38 +00:00
}
}
}
}
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{
DataPartsVector res;
/// Если метод уже вызван из другого потока, то можно ничего не делать.
std::unique_lock<std::mutex> lock(grab_old_parts_mutex, std::defer_lock);
if (!lock.try_lock())
return res;
2014-04-18 10:05:38 +00:00
2014-07-25 11:15:11 +00:00
time_t now = time(0);
2014-07-25 11:15:11 +00:00
{
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
2014-07-25 11:15:11 +00:00
{
if (it->unique() && /// После этого ref_count не может увеличиться.
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
{
res.push_back(*it);
all_data_parts.erase(it++);
}
else
++it;
2014-07-25 11:15:11 +00:00
}
}
if (!res.empty())
LOG_TRACE(log, "Found " << res.size() << " old parts to remove.");
2014-04-09 15:52:47 +00:00
return res;
2014-03-09 17:36:01 +00:00
}
2014-07-25 11:15:11 +00:00
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
2014-07-25 11:15:11 +00:00
all_data_parts.insert(parts.begin(), parts.end());
}
void MergeTreeData::clearOldParts()
{
auto parts_to_remove = grabOldParts();
2014-09-29 20:26:46 +00:00
for (const DataPartPtr & part : parts_to_remove)
2014-07-25 11:15:11 +00:00
{
LOG_DEBUG(log, "Removing part " << part->name);
part->remove();
}
}
2014-07-28 14:33:30 +00:00
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
2014-03-09 17:36:01 +00:00
{
2014-07-28 14:33:30 +00:00
if (move_data)
{
2015-04-13 13:54:49 +00:00
if (Poco::File{new_full_path}.exists())
throw Exception{
"Target path already exists: " + new_full_path,
/// @todo existing target can also be a file, not directory
ErrorCodes::DIRECTORY_ALREADY_EXISTS
};
2014-07-28 14:33:30 +00:00
Poco::File(full_path).renameTo(new_full_path);
/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
context.resetCaches();
}
2014-03-13 19:14:25 +00:00
2014-07-28 14:33:30 +00:00
full_path = new_full_path;
2014-03-09 17:36:01 +00:00
}
2014-03-13 12:48:07 +00:00
void MergeTreeData::dropAllData()
2014-03-09 17:36:01 +00:00
{
LOG_TRACE(log, "dropAllData: waiting for locks.");
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
LOG_TRACE(log, "dropAllData: removing data from memory.");
2014-03-09 17:36:01 +00:00
data_parts.clear();
all_data_parts.clear();
2014-09-19 11:44:29 +00:00
column_sizes.clear();
2014-03-09 17:36:01 +00:00
context.resetCaches();
2014-03-13 19:14:25 +00:00
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
2014-03-09 17:36:01 +00:00
Poco::File(full_path).remove(true);
LOG_TRACE(log, "dropAllData: done.");
2014-03-09 17:36:01 +00:00
}
2014-07-11 12:47:45 +00:00
void MergeTreeData::checkAlter(const AlterCommands & params)
2014-03-09 17:36:01 +00:00
{
2014-07-11 12:47:45 +00:00
/// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
auto new_columns = *columns;
auto new_materialized_columns = materialized_columns;
auto new_alias_columns = alias_columns;
auto new_column_defaults = column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
2014-07-09 13:39:19 +00:00
2014-07-11 12:47:45 +00:00
/// Список столбцов, которые нельзя трогать.
/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
2015-03-13 21:31:23 +00:00
Names keys;
if (primary_expr)
keys = primary_expr->getRequiredColumns();
2016-05-13 21:08:19 +00:00
if (!merging_params.sign_column.empty())
keys.push_back(merging_params.sign_column);
2015-03-13 21:31:23 +00:00
2014-07-11 12:47:45 +00:00
std::sort(keys.begin(), keys.end());
2014-03-09 17:36:01 +00:00
2014-07-11 12:47:45 +00:00
for (const AlterCommand & command : params)
{
2014-07-11 12:47:45 +00:00
if (std::binary_search(keys.begin(), keys.end(), command.column_name))
throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
}
2014-07-11 12:47:45 +00:00
/// Проверим, что преобразования типов возможны.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
/// augment plain columns with materialized columns for convert expression creation
new_columns.insert(std::end(new_columns),
std::begin(new_materialized_columns), std::end(new_materialized_columns));
2016-05-13 21:08:19 +00:00
createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map, unused_bool);
2014-03-09 17:36:01 +00:00
}
2014-09-29 20:26:46 +00:00
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
2014-03-09 17:36:01 +00:00
{
2014-07-11 12:47:45 +00:00
out_expression = nullptr;
out_rename_map = {};
out_force_update_metadata = false;
2014-03-09 17:36:01 +00:00
using NameToType = std::map<String, DataTypePtr>;
2014-07-11 12:47:45 +00:00
NameToType new_types;
for (const NameAndTypePair & column : new_columns)
new_types.emplace(column.name, column.type);
2014-07-09 13:39:19 +00:00
2014-07-11 12:47:45 +00:00
/// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
std::map<String, size_t> nested_table_counts;
2014-07-11 12:47:45 +00:00
for (const NameAndTypePair & column : old_columns)
++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
/// For every column that need to be converted: source column name, column name of calculated expression for conversion.
std::vector<std::pair<String, String>> conversions;
2014-07-11 12:47:45 +00:00
for (const NameAndTypePair & column : old_columns)
2014-03-20 13:00:42 +00:00
{
2014-07-11 12:47:45 +00:00
if (!new_types.count(column.name))
{
bool is_nullable = column.type.get()->isNullable();
2014-07-11 12:47:45 +00:00
if (!part || part->hasColumnFiles(column.name))
{
/// Столбец нужно удалить.
const IDataType * observed_type;
if (is_nullable)
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
observed_type = nullable_type.getNestedType().get();
}
else
observed_type = column.type.get();
2014-07-11 12:47:45 +00:00
String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_column + ".bin"] = "";
out_rename_map[escaped_column + ".mrk"] = "";
if (is_nullable)
{
out_rename_map[escaped_column + ".null"] = "";
out_rename_map[escaped_column + ".null_mrk"] = "";
}
2014-07-11 12:47:45 +00:00
/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
if (typeid_cast<const DataTypeArray *>(observed_type))
2014-07-11 12:47:45 +00:00
{
String nested_table = DataTypeNested::extractNestedTableName(column.name);
/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
if (!--nested_table_counts[nested_table])
{
String escaped_nested_table = escapeForFileName(nested_table);
out_rename_map[escaped_nested_table + ".size0.bin"] = "";
out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
}
}
}
}
2014-07-11 12:47:45 +00:00
else
{
2015-04-01 11:44:04 +00:00
const auto new_type = new_types[column.name].get();
const String new_type_name = new_type->getName();
2016-03-07 08:10:52 +00:00
const auto old_type = column.type.get();
2014-07-11 12:47:45 +00:00
if (new_type_name != old_type->getName() && (!part || part->hasColumnFiles(column.name)))
2014-07-11 12:47:45 +00:00
{
bool is_nullable = new_type->isNullable();
const IDataType * observed_type;
if (is_nullable)
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*new_type);
observed_type = nullable_type.getNestedType().get();
}
else
observed_type = new_type;
// When ALTERing between Enums with same underlying type, don't modify columns, just update columns.txt.
if (part)
{
if ((typeid_cast<const DataTypeEnum8 *>(observed_type) && typeid_cast<const DataTypeEnum8 *>(old_type))
|| (typeid_cast<const DataTypeEnum16 *>(observed_type) && typeid_cast<const DataTypeEnum16 *>(old_type)))
{
out_force_update_metadata = true;
continue;
}
/// Same for Arrays of Enums
const DataTypeArray * arr_from = typeid_cast<const DataTypeArray *>(old_type);
const DataTypeArray * arr_to = typeid_cast<const DataTypeArray *>(observed_type);
if (arr_from && arr_to
&& ((typeid_cast<const DataTypeEnum8 *>(arr_to->getNestedType().get())
&& typeid_cast<const DataTypeEnum8 *>(arr_from->getNestedType().get()))
|| (typeid_cast<const DataTypeEnum16 *>(arr_to->getNestedType().get())
&& typeid_cast<const DataTypeEnum16 *>(arr_from->getNestedType().get()))))
{
out_force_update_metadata = true;
continue;
}
}
2014-07-11 12:47:45 +00:00
/// Need to modify column type.
2014-07-11 12:47:45 +00:00
if (!out_expression)
2016-01-13 00:32:59 +00:00
out_expression = std::make_shared<ExpressionActions>(NamesAndTypesList(), context.getSettingsRef());
2014-03-09 17:36:01 +00:00
out_expression->addInput(ColumnWithTypeAndName(nullptr, column.type, column.name));
2014-07-11 12:47:45 +00:00
Names out_names;
2015-04-01 11:44:04 +00:00
/// @todo invent the name more safely
const auto new_type_name_column = '#' + new_type_name + "_column";
out_expression->add(ExpressionAction::addColumn(
{ std::make_shared<ColumnConstString>(1, new_type_name), std::make_shared<DataTypeString>(), new_type_name_column }));
2015-04-01 11:44:04 +00:00
const FunctionPtr & function = FunctionFactory::instance().get("CAST", context);
out_expression->add(ExpressionAction::applyFunction(
function, Names{column.name, new_type_name_column}), out_names);
2015-04-01 11:44:04 +00:00
out_expression->add(ExpressionAction::removeColumn(new_type_name_column));
2014-07-11 12:47:45 +00:00
out_expression->add(ExpressionAction::removeColumn(column.name));
conversions.emplace_back(column.name, out_names.at(0));
2014-07-11 12:47:45 +00:00
}
}
2014-03-20 13:00:42 +00:00
}
if (!conversions.empty())
{
/// Give proper names for temporary columns with conversion results.
NamesWithAliases projection;
projection.reserve(conversions.size());
for (const auto & source_and_expression : conversions)
{
String converting_column_name = source_and_expression.first + " converting";
projection.emplace_back(source_and_expression.second, converting_column_name);
const String escaped_converted_column = escapeForFileName(converting_column_name);
const String escaped_source_column = escapeForFileName(source_and_expression.first);
/// After conversion, we need to rename temporary files into original.
out_rename_map[escaped_converted_column + ".bin"] = escaped_source_column + ".bin";
out_rename_map[escaped_converted_column + ".mrk"] = escaped_source_column + ".mrk";
const IDataType * new_type = new_types[source_and_expression.first].get();
/// NOTE Sizes of arrays are not updated during conversion.
/// Information on how to update the null map if it is a nullable column.
if (new_type->isNullable())
{
out_rename_map[escaped_converted_column + ".null"] = escaped_source_column + ".null";
out_rename_map[escaped_converted_column + ".null_mrk"] = escaped_source_column + ".null_mrk";
}
2014-07-11 12:47:45 +00:00
}
out_expression->add(ExpressionAction::project(projection));
}
if (part && !out_rename_map.empty())
{
std::string message;
{
WriteBufferFromString out(message);
out << "Will rename ";
bool first = true;
for (const auto & from_to : out_rename_map)
{
if (!first)
out << ", ";
first = false;
out << from_to.first << " to " << from_to.second;
}
out << " in part " << part->name;
}
LOG_DEBUG(log, message);
2014-03-20 13:00:42 +00:00
}
}
2014-03-09 17:36:01 +00:00
2014-09-29 20:26:46 +00:00
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
2016-05-13 21:08:19 +00:00
const DataPartPtr & part,
const NamesAndTypesList & new_columns,
const ASTPtr & new_primary_key,
2016-05-13 21:08:19 +00:00
bool skip_sanity_checks)
2014-03-20 13:00:42 +00:00
{
2014-07-11 12:47:45 +00:00
ExpressionActionsPtr expression;
2014-07-17 13:41:47 +00:00
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Блокирует изменение куска.
bool force_update_metadata;
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata);
2014-07-09 13:39:19 +00:00
size_t num_files_to_modify = transaction->rename_map.size();
size_t num_files_to_remove = 0;
for (const auto & from_to : transaction->rename_map)
if (from_to.second.empty())
++num_files_to_remove;
if (!skip_sanity_checks
&& (num_files_to_modify > settings.max_files_to_modify_in_alter_columns
|| num_files_to_remove > settings.max_files_to_remove_in_alter_columns))
{
transaction->clear();
std::stringstream exception_message;
exception_message << "Suspiciously many (" << transaction->rename_map.size()
<< ") files (";
bool first = true;
for (const auto & from_to : transaction->rename_map)
{
if (!first)
exception_message << ", ";
exception_message << "from '" << from_to.first << "' to '" << from_to.second << "'";
first = false;
}
exception_message << ") need to be modified in part " << part->name << " of table at " << full_path << ". Aborting just in case. "
<< " If it is not an error, you could increase merge_tree/max_files_to_modify_in_alter_columns parameter in configuration file.";
throw Exception(exception_message.str(), ErrorCodes::TABLE_DIFFERS_TOO_MUCH);
}
DataPart::Checksums add_checksums;
/// Обновление первичного ключа, если нужно.
size_t new_primary_key_file_size{};
uint128 new_primary_key_hash{};
if (new_primary_key.get() != primary_expr_ast.get())
{
ExpressionActionsPtr new_primary_expr = ExpressionAnalyzer(new_primary_key, context, nullptr, new_columns).getActions(true);
Block new_primary_key_sample = new_primary_expr->getSampleBlock();
size_t new_key_size = new_primary_key_sample.columns();
Columns new_index(new_key_size);
/// Копируем существующие столбцы первичного ключа. Новые заполняем значениями по-умолчанию.
/// NOTE Не поддерживаются вычислимые значения по-умолчанию.
ssize_t prev_position_of_existing_column = -1;
for (size_t i = 0; i < new_key_size; ++i)
{
const String & column_name = new_primary_key_sample.getByPosition(i).name;
if (primary_key_sample.has(column_name))
{
ssize_t position_of_existing_column = primary_key_sample.getPositionByName(column_name);
if (position_of_existing_column < prev_position_of_existing_column)
throw Exception("Permuting of columns of primary key is not supported", ErrorCodes::BAD_ARGUMENTS);
new_index[i] = part->index.at(position_of_existing_column);
prev_position_of_existing_column = position_of_existing_column;
}
else
{
const IDataType & type = *new_primary_key_sample.getByPosition(i).type;
new_index[i] = type.createConstColumn(part->size, type.getDefault())->convertToFullColumnIfConst();
}
}
if (prev_position_of_existing_column == -1)
throw Exception("No common columns while modifying primary key", ErrorCodes::BAD_ARGUMENTS);
String index_tmp_path = full_path + part->name + "/primary.idx.tmp";
WriteBufferFromFile index_file(index_tmp_path);
HashingWriteBuffer index_stream(index_file);
for (size_t i = 0, size = part->size; i < size; ++i)
for (size_t j = 0; j < new_key_size; ++j)
new_primary_key_sample.unsafeGetByPosition(j).type.get()->serializeBinary(*new_index[j].get(), i, index_stream);
transaction->rename_map["primary.idx.tmp"] = "primary.idx";
index_stream.next();
new_primary_key_file_size = index_stream.count();
new_primary_key_hash = index_stream.getHash();
2014-07-17 09:38:31 +00:00
}
if (transaction->rename_map.empty() && !force_update_metadata)
2014-03-20 13:00:42 +00:00
{
2014-07-14 14:07:47 +00:00
transaction->clear();
2014-07-17 09:38:31 +00:00
return nullptr;
2014-03-20 13:00:42 +00:00
}
2014-03-09 17:36:01 +00:00
2014-07-11 12:47:45 +00:00
/// Применим выражение и запишем результат во временные файлы.
if (expression)
2014-03-20 13:00:42 +00:00
{
MarkRanges ranges(1, MarkRange(0, part->size));
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(full_path + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
2014-07-11 12:47:45 +00:00
ExpressionBlockInputStream in(part_in, expression);
2016-12-10 06:10:29 +00:00
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4, false);
2014-03-27 17:30:04 +00:00
in.readPrefix();
2014-03-20 13:00:42 +00:00
out.writePrefix();
2014-03-09 17:36:01 +00:00
2014-07-11 12:47:45 +00:00
while (Block b = in.read())
out.write(b);
2014-03-27 17:30:04 +00:00
2014-07-11 12:47:45 +00:00
in.readSuffix();
add_checksums = out.writeSuffixAndGetChecksums();
}
2014-03-27 17:30:04 +00:00
2014-07-14 11:45:34 +00:00
/// Обновим контрольные суммы.
2014-07-11 12:47:45 +00:00
DataPart::Checksums new_checksums = part->checksums;
for (auto it : transaction->rename_map)
{
if (it.second == "")
new_checksums.files.erase(it.first);
else
2016-10-19 18:18:02 +00:00
new_checksums.files[it.second] = add_checksums.files[it.first];
}
if (new_primary_key_file_size)
{
new_checksums.files["primary.idx"].file_size = new_primary_key_file_size;
new_checksums.files["primary.idx"].file_hash = new_primary_key_hash;
2014-03-09 17:36:01 +00:00
}
2014-03-20 13:00:42 +00:00
2014-07-11 12:47:45 +00:00
/// Запишем обновленные контрольные суммы во временный файл
if (!part->checksums.empty())
2014-03-09 17:36:01 +00:00
{
2014-07-14 14:07:47 +00:00
transaction->new_checksums = new_checksums;
2014-07-11 12:47:45 +00:00
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
new_checksums.write(checksums_file);
2014-07-11 12:47:45 +00:00
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
2014-03-09 17:36:01 +00:00
}
2014-03-20 13:00:42 +00:00
2014-07-14 14:07:47 +00:00
/// Запишем обновленный список столбцов во временный файл.
{
2014-07-15 09:56:17 +00:00
transaction->new_columns = new_columns.filter(part->columns.getNames());
2014-07-14 14:07:47 +00:00
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
2014-07-11 12:47:45 +00:00
return transaction;
}
2014-03-20 13:00:42 +00:00
2014-07-11 12:47:45 +00:00
void MergeTreeData::AlterDataPartTransaction::commit()
{
if (!data_part)
return;
try
2014-03-09 17:36:01 +00:00
{
2014-07-14 14:07:47 +00:00
Poco::ScopedWriteRWLock lock(data_part->columns_lock);
2014-07-11 12:47:45 +00:00
String path = data_part->storage.full_path + data_part->name + "/";
2014-07-14 14:07:47 +00:00
/// NOTE: checking that a file exists before renaming or deleting it
/// is justified by the fact that, when converting an ordinary column
/// to a nullable column, new files are created which did not exist
/// before, i.e. they do not have older versions.
2014-07-14 14:07:47 +00:00
/// 1) Переименуем старые файлы.
2014-07-11 12:47:45 +00:00
for (auto it : rename_map)
2014-03-20 13:00:42 +00:00
{
2014-07-14 14:07:47 +00:00
String name = it.second.empty() ? it.first : it.second;
Poco::File file{path + name};
if (file.exists())
file.renameTo(path + name + ".tmp2");
2014-07-14 14:07:47 +00:00
}
/// 2) Переместим на их место новые и обновим метаданные в оперативке.
2014-07-14 14:07:47 +00:00
for (auto it : rename_map)
{
if (!it.second.empty())
Poco::File{path + it.first}.renameTo(path + it.second);
2014-03-20 13:00:42 +00:00
}
DataPart & mutable_part = const_cast<DataPart &>(*data_part);
mutable_part.checksums = new_checksums;
mutable_part.columns = new_columns;
2014-07-14 14:07:47 +00:00
/// 3) Удалим старые файлы.
for (auto it : rename_map)
{
String name = it.second.empty() ? it.first : it.second;
Poco::File file{path + name + ".tmp2"};
if (file.exists())
file.remove();
2014-07-14 14:07:47 +00:00
}
2014-07-15 11:12:58 +00:00
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
2014-07-22 13:49:52 +00:00
/// TODO: можно не сбрасывать кеши при добавлении столбца.
data_part->storage.context.resetCaches();
2014-07-14 14:07:47 +00:00
clear();
2014-07-11 12:47:45 +00:00
}
catch (...)
{
/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
2014-07-14 14:07:47 +00:00
clear();
2014-07-11 12:47:45 +00:00
throw;
2014-03-20 13:00:42 +00:00
}
2014-03-09 17:36:01 +00:00
}
2014-03-20 13:00:42 +00:00
2014-07-11 12:47:45 +00:00
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
2014-03-13 19:36:28 +00:00
{
2014-07-11 12:47:45 +00:00
try
2014-03-20 13:00:42 +00:00
{
2014-07-11 12:47:45 +00:00
if (!data_part)
return;
LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
2014-03-27 17:30:04 +00:00
2014-07-11 12:47:45 +00:00
String path = data_part->storage.full_path + data_part->name + "/";
for (auto it : rename_map)
{
if (!it.second.empty())
2014-03-27 17:30:04 +00:00
{
2014-07-11 12:47:45 +00:00
try
{
2014-07-17 13:41:47 +00:00
Poco::File file(path + it.first);
if (file.exists())
file.remove();
2014-07-11 12:47:45 +00:00
}
catch (Poco::Exception & e)
{
LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
}
2014-03-27 17:30:04 +00:00
}
2014-03-20 13:00:42 +00:00
}
}
2014-07-11 12:47:45 +00:00
catch (...)
2014-03-20 13:00:42 +00:00
{
2014-07-11 12:47:45 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
2014-03-13 19:36:28 +00:00
}
2014-03-09 17:36:01 +00:00
}
2015-06-02 20:22:53 +00:00
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
2014-03-13 12:48:07 +00:00
{
2014-07-01 15:58:25 +00:00
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
2014-04-07 15:45:46 +00:00
if (!removed.empty())
throw Exception("Added part " + part->name + " covers " + toString(removed.size())
+ " existing part(s) (including " + removed[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
2014-03-13 12:48:07 +00:00
}
2014-07-01 15:58:25 +00:00
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
2015-06-02 20:22:53 +00:00
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
2014-03-13 17:44:00 +00:00
{
2014-07-01 15:58:25 +00:00
if (out_transaction && out_transaction->data)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
2014-07-01 15:58:25 +00:00
LOG_TRACE(log, "Renaming " << part->name << ".");
2014-03-13 17:44:00 +00:00
2014-07-17 10:44:17 +00:00
String old_name = part->name;
String old_path = getFullPath() + old_name + "/";
2014-03-13 17:44:00 +00:00
DataPartsVector replaced;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
2014-03-13 17:44:00 +00:00
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
if (increment)
part->left = part->right = increment->get();
String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
part->is_temp = false;
part->name = new_name;
bool duplicate = data_parts.count(part);
part->name = old_name;
part->is_temp = true;
2014-07-17 10:44:17 +00:00
if (duplicate)
throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
2014-03-13 17:44:00 +00:00
String new_path = getFullPath() + new_name + "/";
2014-03-13 17:44:00 +00:00
/// Переименовываем кусок.
Poco::File(old_path).renameTo(new_path);
2014-07-17 10:44:17 +00:00
part->is_temp = false;
part->name = new_name;
2015-08-17 21:09:36 +00:00
bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
DataParts::iterator it = data_parts.lower_bound(part);
/// Пойдем влево.
while (it != data_parts.begin())
2014-04-07 17:39:45 +00:00
{
--it;
if (!part->contains(**it))
{
if ((*it)->contains(*part))
obsolete = true;
++it;
break;
}
replaced.push_back(*it);
(*it)->remove_time = time(0);
removePartContributionToColumnSizes(*it);
data_parts.erase(it++); /// Да, ++, а не --.
2014-04-07 17:39:45 +00:00
}
std::reverse(replaced.begin(), replaced.end()); /// Нужно получить куски в порядке возрастания.
/// Пойдем вправо.
while (it != data_parts.end())
2014-05-27 12:08:40 +00:00
{
if (!part->contains(**it))
{
if ((*it)->name == part->name || (*it)->contains(*part))
obsolete = true;
break;
}
replaced.push_back(*it);
(*it)->remove_time = time(0);
removePartContributionToColumnSizes(*it);
data_parts.erase(it++);
2014-05-27 12:08:40 +00:00
}
2014-04-07 15:45:46 +00:00
if (obsolete)
{
LOG_WARNING(log, "Obsolete part " << part->name << " added");
part->remove_time = time(0);
}
else
{
data_parts.insert(part);
addPartContributionToColumnSizes(part);
}
{
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
all_data_parts.insert(part);
}
}
2014-04-07 15:45:46 +00:00
2014-07-01 15:58:25 +00:00
if (out_transaction)
{
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = replaced;
out_transaction->parts_to_remove_on_rollback = DataPartsVector(1, part);
2014-07-01 15:58:25 +00:00
}
return replaced;
2014-03-13 17:44:00 +00:00
}
2014-07-07 10:23:24 +00:00
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
2014-07-01 15:58:25 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
2014-07-01 15:58:25 +00:00
for (const DataPartPtr & part : remove)
{
2014-07-07 10:23:24 +00:00
part->remove_time = clear_without_timeout ? 0 : time(0);
if (data_parts.erase(part))
removePartContributionToColumnSizes(part);
2014-07-01 15:58:25 +00:00
}
2015-09-16 04:18:16 +00:00
2014-07-01 15:58:25 +00:00
for (const DataPartPtr & part : add)
{
if (data_parts.insert(part).second)
addPartContributionToColumnSizes(part);
2014-07-01 15:58:25 +00:00
}
}
2014-09-29 20:26:46 +00:00
void MergeTreeData::attachPart(const DataPartPtr & part)
2014-08-08 08:28:13 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
2014-08-08 08:28:13 +00:00
if (!all_data_parts.insert(part).second)
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
2015-06-02 20:22:53 +00:00
2014-08-08 08:28:13 +00:00
data_parts.insert(part);
2015-06-02 20:22:53 +00:00
addPartContributionToColumnSizes(part);
2014-08-08 08:28:13 +00:00
}
2014-09-29 20:26:46 +00:00
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
2014-04-02 07:59:43 +00:00
{
2014-07-28 09:46:28 +00:00
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
2014-07-28 09:46:28 +00:00
2014-04-09 16:32:32 +00:00
if (!all_data_parts.erase(part))
2014-04-02 07:59:43 +00:00
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
2014-07-28 09:46:28 +00:00
2014-09-19 11:44:29 +00:00
removePartContributionToColumnSizes(part);
2014-04-09 16:32:32 +00:00
data_parts.erase(part);
2014-08-08 08:28:13 +00:00
if (move_to_detached || !prefix.empty())
part->renameAddPrefix(move_to_detached, prefix);
2014-07-28 09:46:28 +00:00
if (restore_covered)
{
auto it = all_data_parts.lower_bound(part);
Strings restored;
bool error = false;
2015-08-17 21:09:36 +00:00
Int64 pos = part->left;
2014-07-28 09:46:28 +00:00
if (it != all_data_parts.begin())
{
--it;
if (part->contains(**it))
{
if ((*it)->left != part->left)
error = true;
data_parts.insert(*it);
2014-09-19 11:44:29 +00:00
addPartContributionToColumnSizes(*it);
2014-07-28 09:46:28 +00:00
pos = (*it)->right + 1;
restored.push_back((*it)->name);
}
else
error = true;
++it;
}
else
error = true;
for (; it != all_data_parts.end() && part->contains(**it); ++it)
{
if ((*it)->left < pos)
continue;
if ((*it)->left > pos)
error = true;
data_parts.insert(*it);
2014-09-19 11:44:29 +00:00
addPartContributionToColumnSizes(*it);
2014-07-28 09:46:28 +00:00
pos = (*it)->right + 1;
restored.push_back((*it)->name);
}
if (pos != part->right + 1)
error = true;
for (const String & name : restored)
{
LOG_INFO(log, "Activated part " << name);
}
if (error)
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
}
2014-03-13 17:44:00 +00:00
}
2014-09-29 20:26:46 +00:00
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
2014-08-08 08:28:13 +00:00
{
renameAndDetachPart(part, "", false, false);
}
MergeTreeData::DataParts MergeTreeData::getDataParts() const
2014-03-13 12:48:07 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
2014-03-13 12:48:07 +00:00
return data_parts;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
2014-09-19 11:44:29 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
return DataPartsVector(std::begin(data_parts), std::end(data_parts));
2014-09-19 11:44:29 +00:00
}
size_t MergeTreeData::getTotalActiveSizeInBytes() const
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
size_t res = 0;
for (auto & part : data_parts)
res += part->size_in_bytes;
return res;
}
MergeTreeData::DataParts MergeTreeData::getAllDataParts() const
2014-04-09 16:32:32 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
2014-04-09 16:32:32 +00:00
return all_data_parts;
}
size_t MergeTreeData::getMaxPartsCountForMonth() const
2014-04-11 16:56:49 +00:00
{
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
2014-04-11 16:56:49 +00:00
2014-05-16 15:55:57 +00:00
size_t res = 0;
size_t cur_count = 0;
DayNum_t cur_month = DayNum_t(0);
for (const auto & part : data_parts)
{
2015-08-17 21:09:36 +00:00
if (part->month == cur_month)
2014-05-16 15:55:57 +00:00
{
++cur_count;
}
else
{
2015-08-17 21:09:36 +00:00
cur_month = part->month;
2014-05-16 15:55:57 +00:00
cur_count = 1;
}
res = std::max(res, cur_count);
}
return res;
2014-04-11 16:56:49 +00:00
}
std::pair<Int64, bool> MergeTreeData::getMinBlockNumberForMonth(DayNum_t month) const
{
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
for (const auto & part : all_data_parts) /// Поиск можно сделать лучше.
if (part->month == month)
return { part->left, true }; /// Блоки в data_parts упорядочены по month и left.
return { 0, false };
}
bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (const auto & part : data_parts) /// Поиск можно сделать лучше.
{
if (part->month == month && part->left <= block_number && part->right >= block_number)
return true;
if (part->month > month)
break;
}
return false;
}
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
2014-05-27 08:43:01 +00:00
{
const size_t parts_count = getMaxPartsCountForMonth();
if (parts_count < settings.parts_to_delay_insert)
return;
if (parts_count >= settings.parts_to_throw_insert)
2014-05-27 08:43:01 +00:00
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
}
2014-09-12 20:29:29 +00:00
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k
const double delay_sec = ::pow(settings.max_delay_to_insert, static_cast<double>(k) / max_k);
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_sec * 1000);
CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay_sec << " sec. because there are " << parts_count << " parts");
if (until)
until->tryWait(delay_sec * 1000);
else
std::this_thread::sleep_for(std::chrono::duration<double>(delay_sec));
2014-05-27 08:43:01 +00:00
}
2014-07-25 11:38:46 +00:00
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
2014-04-03 11:48:28 +00:00
{
MutableDataPartPtr tmp_part(new DataPart(*this));
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
2014-04-03 11:48:28 +00:00
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(data_parts_mutex);
2014-04-09 15:52:47 +00:00
2014-04-03 11:48:28 +00:00
/// Кусок может покрываться только предыдущим или следующим в data_parts.
2014-05-27 10:03:13 +00:00
DataParts::iterator it = data_parts.lower_bound(tmp_part);
2014-04-03 11:48:28 +00:00
2014-05-27 10:03:13 +00:00
if (it != data_parts.end())
2014-04-03 11:48:28 +00:00
{
if ((*it)->name == part_name)
return *it;
if ((*it)->contains(*tmp_part))
return *it;
}
2014-05-27 10:03:13 +00:00
if (it != data_parts.begin())
2014-04-03 11:48:28 +00:00
{
--it;
if ((*it)->contains(*tmp_part))
return *it;
}
return nullptr;
}
2014-07-25 11:38:46 +00:00
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
MutableDataPartPtr tmp_part(new DataPart(*this));
ActiveDataPartSet::parsePartName(part_name, *tmp_part);
2016-01-29 02:22:43 +00:00
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
2014-07-25 11:38:46 +00:00
DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
if (it != all_data_parts.end() && (*it)->name == part_name)
return *it;
return nullptr;
}
2016-01-28 01:00:27 +00:00
MergeTreeData::DataPartPtr MergeTreeData::getShardedPartIfExists(const String & part_name, size_t shard_no)
{
2016-01-28 16:06:57 +00:00
const MutableDataPartPtr & part_from_shard = per_shard_data_parts.at(shard_no);
2016-01-28 01:00:27 +00:00
2016-01-28 16:06:57 +00:00
if (part_from_shard->name == part_name)
return part_from_shard;
else
return nullptr;
2016-01-28 01:00:27 +00:00
}
2014-08-08 08:28:13 +00:00
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this);
part->name = relative_path;
ActiveDataPartSet::parsePartName(Poco::Path(relative_path).getFileName(), *part);
2014-08-08 08:28:13 +00:00
/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
if (Poco::File(full_path + relative_path + "/columns.txt").exists())
Poco::File(full_path + relative_path + "/columns.txt").remove();
part->loadColumns(false);
part->loadChecksums(false);
part->loadIndex();
part->checkNotBroken(false);
part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();
/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
if (part->checksums.empty())
{
MergeTreePartChecker::Settings settings;
settings.setIndexGranularity(index_granularity);
settings.setRequireColumnFiles(true);
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, primary_key_data_types, &part->checksums);
2014-08-08 08:28:13 +00:00
{
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
part->checksums.write(out);
2014-08-08 08:28:13 +00:00
}
Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
}
return part;
}
2014-09-19 11:44:29 +00:00
void MergeTreeData::calculateColumnSizes()
{
column_sizes.clear();
for (const auto & part : data_parts)
addPartContributionToColumnSizes(part);
}
void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
const auto & files = part->checksums.files;
for (const auto & column : *columns)
{
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
auto & column_size = column_sizes[column.name];
2015-09-16 04:18:16 +00:00
if (files.count(bin_file_name))
column_size += files.find(bin_file_name)->second.file_size;
if (files.count(mrk_file_name))
column_size += files.find(mrk_file_name)->second.file_size;
2014-09-19 11:44:29 +00:00
}
}
void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
{
const auto & files = part->checksums.files;
for (const auto & column : *columns)
{
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
auto & column_size = column_sizes[column.name];
2015-09-16 04:18:16 +00:00
if (files.count(bin_file_name))
column_size -= files.find(bin_file_name)->second.file_size;
if (files.count(mrk_file_name))
column_size -= files.find(mrk_file_name)->second.file_size;
2014-09-19 11:44:29 +00:00
}
}
2014-10-03 17:57:01 +00:00
void MergeTreeData::freezePartition(const std::string & prefix, const String & with_name)
{
LOG_DEBUG(log, "Freezing parts with prefix " + prefix);
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
String shadow_path = clickhouse_path + "shadow/";
Poco::File(shadow_path).createDirectories();
String backup_path = shadow_path
+ (!with_name.empty()
? escapeForFileName(with_name)
: toString(Increment(shadow_path + "increment.txt").get(true)))
+ "/";
LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);
size_t parts_processed = 0;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
if (startsWith(it.name(), prefix))
{
2015-09-20 11:56:14 +00:00
LOG_DEBUG(log, "Freezing part " << it.name());
String part_absolute_path = it.path().absolute().toString();
if (!startsWith(part_absolute_path, clickhouse_path))
throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);
String backup_part_absolute_path = part_absolute_path;
backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
localBackup(part_absolute_path, backup_part_absolute_path);
++parts_processed;
}
}
LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
}
2016-01-28 01:00:27 +00:00
size_t MergeTreeData::getPartitionSize(const std::string & partition_name) const
{
size_t size = 0;
Poco::DirectoryIterator end;
Poco::DirectoryIterator end2;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
const auto filename = it.name();
if (!ActiveDataPartSet::isPartDirectory(filename))
continue;
if (!startsWith(filename, partition_name))
2016-01-28 01:00:27 +00:00
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end2; ++it2)
{
const auto part_file_path = it2.path().absolute().toString();
size += Poco::File(part_file_path).getSize();
}
}
return size;
}
2014-10-03 17:57:01 +00:00
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
String month_name = partition.getType() == Field::Types::UInt64
? toString(partition.get<UInt64>())
: partition.safeGet<String>();
if (month_name.size() != 6 || !std::all_of(month_name.begin(), month_name.end(), isdigit))
throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
ErrorCodes::INVALID_PARTITION_NAME);
DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
2014-10-03 17:57:01 +00:00
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
2014-10-03 17:57:01 +00:00
throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
ErrorCodes::INVALID_PARTITION_NAME);
return std::make_pair(month_name, date);
}
String MergeTreeData::getMonthName(const Field & partition)
{
return getMonthNameAndDayNum(partition).first;
}
2016-01-28 01:00:27 +00:00
String MergeTreeData::getMonthName(DayNum_t month)
{
return toString(DateLUT::instance().toNumYYYYMMDD(month) / 100);
}
2014-10-03 17:57:01 +00:00
DayNum_t MergeTreeData::getMonthDayNum(const Field & partition)
{
return getMonthNameAndDayNum(partition).second;
}
2016-01-28 01:00:27 +00:00
DayNum_t MergeTreeData::getMonthFromName(const String & month_name)
{
DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
ErrorCodes::INVALID_PARTITION_NAME);
return date;
}
DayNum_t MergeTreeData::getMonthFromPartPrefix(const String & part_prefix)
{
return getMonthFromName(part_prefix.substr(0, strlen("YYYYMM")));
}
2014-03-09 17:36:01 +00:00
}