dbms: added FREEZE PARTITION functionality [#METR-13441].

This commit is contained in:
Alexey Milovidov 2014-11-11 07:11:07 +03:00
parent a800008359
commit 9a9036f217
15 changed files with 199 additions and 1 deletions

View File

@ -0,0 +1,16 @@
#pragma once
#include <Poco/Path.h>
/** Создаёт локальный (в той же точке монтирования) бэкап (снэпшот) директории.
*
* В указанной destination-директории создаёт hard link-и на все файлы source-директории
* и во всех вложенных директориях, с сохранением (созданием) всех относительных путей;
* а также делает chown, снимая разрешение на запись.
*
* Это защищает данные от случайного удаления или модификации,
* и предназначено для использования как простое средство защиты от человеческой или программной ошибки,
* но не от аппаратного сбоя.
*/
void localBackup(Poco::Path source_path, Poco::Path destination_path);

View File

@ -31,6 +31,7 @@ private:
DROP_PARTITION,
ATTACH_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
};
Type type;
@ -57,12 +58,17 @@ private:
{
return {FETCH_PARTITION, partition, false, false, false, from};
}
static PartitionCommand freezePartition(const Field & partition)
{
return {FREEZE_PARTITION, partition};
}
};
typedef std::vector<PartitionCommand> PartitionCommands;
ASTPtr query_ptr;
Context context;
static void parseAlter(const ASTAlterQuery::ParameterContainer & params, const DataTypeFactory & data_type_factory,

View File

@ -25,6 +25,7 @@ public:
DROP_PARTITION,
ATTACH_PARTITION,
FETCH_PARTITION,
FREEZE_PARTITION,
NO_TYPE
};

View File

@ -12,6 +12,7 @@ namespace DB
* [MODIFY COLUMN col_modify type, ...]
* [DROP|DETACH|ATTACH [UNREPLICATED] PARTITION|PART partition, ...]
* [FETCH PARTITION partition FROM ...]
* [FREEZE PARTITION]
*/
class ParserAlterQuery : public IParserBase
{

View File

@ -226,6 +226,13 @@ public:
throw Exception("Method fetchPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить запрос FREEZE PARTITION. То есть, создать локальный бэкап (снэпшот) данных с помощью функции localBackup (см. localBackup.h)
*/
virtual void freezePartition(const Field & partition, const Settings & settings)
{
throw Exception("Method freezePartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа.
*/

View File

@ -741,6 +741,11 @@ public:
/// Проверить, что кусок не сломан и посчитать для него чексуммы, если их нет.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
/** Сделать локальный бэкап (снэпшот) для кусков, начинающихся с указанного префикса.
* Бэкап создаётся в директории clickhouse_dir/shadow/i/, где i - инкрементное число.
*/
void freezePartition(const std::string & prefix);
size_t getColumnSize(const std::string & name) const
{
Poco::ScopedLock<Poco::FastMutex> lock{data_parts_mutex};

View File

@ -79,6 +79,7 @@ public:
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
void drop() override;

View File

@ -84,6 +84,7 @@ public:
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void fetchPartition(const Field & partition, const String & from, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
*/

View File

@ -0,0 +1,72 @@
#include <sys/stat.h>
#include <unistd.h>
#include <string>
#include <iostream>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
static void localBackupImpl(Poco::Path source_path, Poco::Path destination_path, size_t level)
{
if (level >= 1000)
throw DB::Exception("Too deep recursion");
std::cerr << source_path.toString() << ", " << destination_path.toString() << "\n";
Poco::File(destination_path).createDirectories();
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_path); dir_it != dir_end; ++dir_it)
{
Poco::Path source = dir_it.path();
Poco::Path destination = destination_path;
destination.append(dir_it.name());
if (!dir_it->isDirectory())
{
dir_it->setReadOnly();
std::string source_str = source.toString();
std::string destination_str = destination.toString();
/** Пытаемся создать hard link.
* Если он уже существует, то проверим, что source и destination указывают на один и тот же inode.
*/
if (0 != link(source_str.c_str(), destination_str.c_str()))
{
if (errno == EEXIST)
{
auto link_errno = errno;
struct stat source_descr;
struct stat destination_descr;
if (0 != lstat(source_str.c_str(), &source_descr))
DB::throwFromErrno("Cannot stat " + source_str);
if (0 != lstat(destination_str.c_str(), &destination_descr))
DB::throwFromErrno("Cannot stat " + destination_str);
if (source_descr.st_ino != destination_descr.st_ino)
DB::throwFromErrno("Destination file " + destination_str + " is already exist and have different inode.", 0, link_errno);
}
else
DB::throwFromErrno("Cannot link " + source_str + " to " + destination_str);
}
}
else
{
localBackupImpl(source, destination, level + 1);
}
}
}
void localBackup(Poco::Path source_path, Poco::Path destination_path)
{
localBackupImpl(source_path, destination_path, 0);
}

View File

@ -53,6 +53,10 @@ void InterpreterAlterQuery::execute()
table->fetchPartition(command.partition, command.from, context.getSettingsRef());
break;
case PartitionCommand::FREEZE_PARTITION:
table->freezePartition(command.partition, context.getSettingsRef());
break;
default:
throw Exception("Bad PartitionCommand::Type: " + toString(command.type), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
@ -122,6 +126,11 @@ void InterpreterAlterQuery::parseAlter(
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partition_commands.push_back(PartitionCommand::fetchPartition(partition, params.from));
}
else if (params.type == ASTAlterQuery::FREEZE_PARTITION)
{
const Field & partition = dynamic_cast<const ASTLiteral &>(*params.partition).value;
out_partition_commands.push_back(PartitionCommand::freezePartition(partition));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -27,6 +27,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
ParserString s_detach("DETACH", true, true);
ParserString s_attach("ATTACH", true, true);
ParserString s_fetch("FETCH", true, true);
ParserString s_freeze("FREEZE", true, true);
ParserString s_unreplicated("UNREPLICATED", true, true);
ParserString s_part("PART", true, true);
ParserString s_partition("PARTITION", true, true);
@ -191,6 +192,22 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Expected & e
params.from = typeid_cast<const ASTLiteral &>(*ast_from).value.get<const String &>();
params.type = ASTAlterQuery::FETCH_PARTITION;
}
else if (s_freeze.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (!s_partition.ignore(pos, end, expected))
return false;
ws.ignore(pos, end);
if (!parser_literal.parse(pos, end, params.partition, expected))
return false;
ws.ignore(pos, end);
params.type = ASTAlterQuery::FREEZE_PARTITION;
}
else if (s_modify.ignore(pos, end, expected))
{
ws.ignore(pos, end);

View File

@ -773,6 +773,11 @@ void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent, bo
s << (hilite ? hilite_keyword : "") << " FROM " << (hilite ? hilite_none : "")
<< mysqlxx::quote << p.from;
}
else if (p.type == ASTAlterQuery::FREEZE_PARTITION)
{
s << (hilite ? hilite_keyword : "") << indent_str << "FREEZE PARTITION " << (hilite ? hilite_none : "");
formatAST(*p.partition, s, indent, hilite, true);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);

View File

@ -12,6 +12,7 @@
#include <DB/DataStreams/copyData.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/Common/localBackup.h>
#include <algorithm>
#include <iomanip>
@ -1153,6 +1154,40 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
}
void MergeTreeData::freezePartition(const std::string & prefix)
{
LOG_DEBUG(log, "Freezing parts with prefix " + prefix);
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
String shadow_path = clickhouse_path + "shadow/";
Poco::File(shadow_path).createDirectories();
String backup_path = shadow_path + toString(Increment(shadow_path + "increment.txt").get(true)) + "/";
LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);
size_t parts_processed = 0;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
if (0 == it.name().compare(0, prefix.size(), prefix))
{
LOG_DEBUG(log, "Freezing part " + it.name());
String part_absolute_path = it.path().absolute().toString();
if (0 != part_absolute_path.compare(0, clickhouse_path.size(), clickhouse_path))
throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);
String backup_part_absolute_path = part_absolute_path;
backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
localBackup(part_absolute_path, backup_part_absolute_path);
++parts_processed;
}
}
LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
}
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
String month_name = partition.getType() == Field::Types::UInt64

View File

@ -306,4 +306,13 @@ void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, b
context.resetCaches();
}
void StorageMergeTree::freezePartition(const Field & partition, const Settings & settings)
{
/// Префикс может быть произвольным. Не обязательно месяц - можно указать лишь год.
data.freezePartition(partition.getType() == Field::Types::UInt64
? toString(partition.get<UInt64>())
: partition.safeGet<String>());
}
}

View File

@ -2603,4 +2603,17 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
}
void StorageReplicatedMergeTree::freezePartition(const Field & partition, const Settings & settings)
{
/// Префикс может быть произвольным. Не обязательно месяц - можно указать лишь год.
String prefix = partition.getType() == Field::Types::UInt64
? toString(partition.get<UInt64>())
: partition.safeGet<String>();
data.freezePartition(prefix);
if (unreplicated_data)
unreplicated_data->freezePartition(prefix);
}
}