use hard links to copy columns that didn't change [#CLICKHOUSE-13]

This commit is contained in:
Alexey Zatelepin 2018-08-28 16:48:16 +03:00
parent dc2a4c21e9
commit e9493b3a5f
5 changed files with 215 additions and 62 deletions

View File

@ -0,0 +1,36 @@
#include <Common/createHardLink.h>
#include <Common/Exception.h>
#include <errno.h>
#include <unistd.h>
#include <sys/stat.h>
namespace DB
{
void createHardLink(const String & source_path, const String & destination_path)
{
if (0 != link(source_path.c_str(), destination_path.c_str()))
{
if (errno == EEXIST)
{
auto link_errno = errno;
struct stat source_descr;
struct stat destination_descr;
if (0 != lstat(source_path.c_str(), &source_descr))
DB::throwFromErrno("Cannot stat " + source_path);
if (0 != lstat(destination_path.c_str(), &destination_descr))
DB::throwFromErrno("Cannot stat " + destination_path);
if (source_descr.st_ino != destination_descr.st_ino)
DB::throwFromErrno("Destination file " + destination_path + " is already exist and have different inode.", 0, link_errno);
}
else
DB::throwFromErrno("Cannot link " + source_path + " to " + destination_path);
}
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// Create a hard link `destination_path` pointing to `source_path`.
/// If the destination already exists, check that it has the same inode (and throw if they are different).
void createHardLink(const String & source_path, const String & destination_path);
}

View File

@ -1,22 +1,21 @@
#include "localBackup.h"
#include <sys/stat.h>
#include <string>
#include <iostream>
#include <Common/localBackup.h>
#include <Common/createHardLink.h>
#include <Common/Exception.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Common/Exception.h>
#include <port/unistd.h>
#include <string>
#include <iostream>
#include <errno.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_DEEP_RECURSION;
extern const int DIRECTORY_ALREADY_EXISTS;
}
}
static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & destination_path, size_t level,
@ -41,33 +40,7 @@ static void localBackupImpl(const Poco::Path & source_path, const Poco::Path & d
{
dir_it->setReadOnly();
std::string source_str = source.toString();
std::string destination_str = destination.toString();
/** We are trying to create a hard link.
* If it already exists, we check that source and destination point to the same inode.
*/
if (0 != link(source_str.c_str(), destination_str.c_str()))
{
if (errno == EEXIST)
{
auto link_errno = errno;
struct stat source_descr;
struct stat destination_descr;
if (0 != lstat(source_str.c_str(), &source_descr))
DB::throwFromErrno("Cannot stat " + source_str);
if (0 != lstat(destination_str.c_str(), &destination_descr))
DB::throwFromErrno("Cannot stat " + destination_str);
if (source_descr.st_ino != destination_descr.st_ino)
DB::throwFromErrno("Destination file " + destination_str + " is already exist and have different inode.", 0, link_errno);
}
else
DB::throwFromErrno("Cannot link " + source_str + " to " + destination_str);
}
createHardLink(source.toString(), destination.toString());
}
else
{
@ -120,3 +93,5 @@ void localBackup(const Poco::Path & source_path, const Poco::Path & destination_
break;
}
}
}

View File

@ -4,6 +4,9 @@
#include <optional>
namespace DB
{
/** Creates a local (at the same mount point) backup (snapshot) directory.
*
* In the specified destination directory, it creates a hard links on all source-directory files
@ -19,3 +22,4 @@
*/
void localBackup(const Poco::Path & source_path, const Poco::Path & destination_path, std::optional<size_t> max_level = {});
}

View File

@ -20,6 +20,7 @@
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <DataStreams/FilterColumnsBlockInputStream.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/CompressedWriteBuffer.h>
@ -30,8 +31,10 @@
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/localBackup.h>
#include <Common/createHardLink.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <cmath>
#include <numeric>
@ -57,6 +60,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int UNKNOWN_MUTATION_COMMAND;
}
@ -881,14 +885,57 @@ static bool isStorageTouchedByMutation(
}
static BlockInputStreamPtr createInputStreamWithMutatedData(
const StoragePtr & storage, std::vector<MutationCommand> commands, const Context & context)
const StoragePtr & storage, std::vector<MutationCommand> commands, const Context & context, NameSet * output_columns)
{
NameSet input_columns;
{
ASTPtr expressions = std::make_shared<ASTExpressionList>();
for (const auto & command : commands)
{
if (command.type == MutationCommand::DELETE)
{
for (const auto & column : storage->getColumns().getAllPhysical())
{
input_columns.insert(column.name);
if (output_columns)
output_columns->insert(column.name);
}
break;
}
else if (command.type == MutationCommand::UPDATE)
{
if (command.predicate)
expressions->children.push_back(command.predicate);
for (const auto & pair : command.column_to_update_expression)
{
const String & column_name = pair.first;
const ASTPtr & update_expr_ast = pair.second;
input_columns.insert(column_name);
if (output_columns)
output_columns->insert(column_name);
expressions->children.push_back(update_expr_ast);
}
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
for (const auto & column : ExpressionAnalyzer(expressions, context, storage).getRequiredSourceColumns())
input_columns.insert(column);
}
auto select = std::make_shared<ASTSelectQuery>();
select->select_expression_list = std::make_shared<ASTExpressionList>();
select->children.push_back(select->select_expression_list);
for (const auto & column : storage->getColumns().getAllPhysical())
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column.name));
for (const auto & column_name : input_columns)
select->select_expression_list->children.push_back(std::make_shared<ASTIdentifier>(column_name));
/// For all commands that are in front of the list and are DELETE commands, we can push them down
/// to the SELECT statement and remove them from commands.
@ -946,8 +993,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const std::vector<MutationCommand> & commands,
const Context & context)
{
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
auto check_cancelled = [&]()
{
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
};
check_cancelled();
if (future_part.parts.size() != 1)
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
@ -986,38 +1040,110 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading);
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
if (data.hasPrimaryKey())
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.getPrimaryExpression()));
NameSet output_columns;
auto in = createInputStreamWithMutatedData(storage_from_source_part, commands, context_for_reading, &output_columns);
Poco::File(new_part_tmp_path).createDirectories();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
MergeTreeDataPart::MinMaxIndex minmax_idx;
in->readPrefix();
out.writePrefix();
Block block;
while (!actions_blocker.isCancelled() && (block = in->read()))
if (output_columns.size() == all_columns.size())
{
minmax_idx.update(block, data.minmax_idx_columns);
out.write(block);
/// All columns are modified, proceed to write a new part from scratch.
if (data.hasPrimaryKey())
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.getPrimaryExpression()));
MergeTreeDataPart::MinMaxIndex minmax_idx;
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
in->readPrefix();
out.writePrefix();
Block block;
while (check_cancelled() && (block = in->read()))
{
minmax_idx.update(block, data.minmax_idx_columns);
out.write(block);
}
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = std::move(minmax_idx);
in->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part);
}
else
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
/// TODO: check that we modify only non-key columns in this case.
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
in = std::make_shared<FilterColumnsBlockInputStream>(
in, Names(output_columns.begin(), output_columns.end()), /* throw_if_column_not_found = */ true);
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = std::move(minmax_idx);
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
for (const auto & entry : in->getHeader())
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + ".mrk");
};
in->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part);
IDataType::SubstreamPath stream_path;
entry.type->enumerateStreams(callback, stream_path);
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
if (files_to_skip.count(dir_it.name()))
continue;
Poco::Path destination(new_part_tmp_path);
destination.append(dir_it.name());
createHardLink(dir_it.path().toString(), destination.toString());
}
MergedColumnOnlyOutputStream out(data, in->getHeader(), new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false);
in->readPrefix();
out.writePrefix();
Block block;
while (check_cancelled() && (block = in->read()))
out.write(block);
in->readSuffix();
auto changed_checksums = out.writeSuffixAndGetChecksums();
new_data_part->checksums = source_part->checksums;
new_data_part->checksums.add(std::move(changed_checksums));
{
/// Write file with checksums.
WriteBufferFromFile out(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(out);
}
new_data_part->columns = all_columns;
{
/// Write a file with a description of columns.
WriteBufferFromFile out(new_part_tmp_path + "columns.txt", 4096);
all_columns.writeText(out);
}
new_data_part->rows_count = source_part->rows_count;
new_data_part->marks_count = source_part->marks_count;
new_data_part->index = source_part->index;
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->getFullPath());
}
return new_data_part;
}