mutation executor for MergeTree [#CLICKHOUSE-3747]

This commit is contained in:
Alexey Zatelepin 2018-05-13 03:24:52 +03:00
parent 1147e052e8
commit fd81cc7f66
4 changed files with 158 additions and 0 deletions

View File

@ -0,0 +1,60 @@
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionAnalyzer.h>
namespace DB
{
ApplyingMutationsBlockInputStream::ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context)
{
if (commands.empty())
throw Exception("Empty mutation commands list. This is a bug.", ErrorCodes::LOGICAL_ERROR);
children.push_back(input);
for (const MutationCommand & cmd : commands)
{
const BlockInputStreamPtr & cur_input = impl ? impl : input;
switch (cmd.type)
{
case MutationCommand::DELETE:
{
auto predicate = std::make_shared<ASTFunction>();
predicate->name = "not";
predicate->arguments = std::make_shared<ASTExpressionList>();
predicate->arguments->children.push_back(cmd.predicate);
predicate->children.push_back(predicate->arguments);
auto predicate_expr = ExpressionAnalyzer(
predicate, context, nullptr, cur_input->getHeader().getNamesAndTypesList()).getActions(false);
String col_name = predicate->getColumnName();
impl = std::make_shared<FilterBlockInputStream>(cur_input, predicate_expr, col_name);
break;
}
default:
throw Exception("Unsupported mutation cmd type: " + toString(static_cast<int>(cmd.type)),
ErrorCodes::LOGICAL_ERROR);
}
}
}
Block ApplyingMutationsBlockInputStream::getHeader() const
{
return impl->getHeader();
}
Block ApplyingMutationsBlockInputStream::getTotals()
{
return impl->getTotals();
}
Block ApplyingMutationsBlockInputStream::readImpl()
{
return impl->read();
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MutationCommands.h>
#include <Interpreters/Context.h>
namespace DB
{
class ApplyingMutationsBlockInputStream : public IProfilingBlockInputStream
{
public:
ApplyingMutationsBlockInputStream(
const BlockInputStreamPtr & input, const std::vector<MutationCommand> & commands, const Context & context);
String getName() const override { return "ApplyMutations"; }
Block getHeader() const override;
Block getTotals() override;
private:
Block readImpl() override;
ProfilingBlockInputStreamPtr impl;
};
}

View File

@ -17,6 +17,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/ApplyingMutationsBlockInputStream.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/NestedUtils.h>
@ -797,6 +798,70 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
}
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
const FuturePart & future_part,
const std::vector<MutationCommand> & commands,
const Context & context)
{
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
if (future_part.parts.size() != 1)
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
"This is a bug.", ErrorCodes::LOGICAL_ERROR);
const auto & source_part = future_part.parts[0];
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;
String new_part_tmp_path = new_data_part->getFullPath();
Poco::File(new_part_tmp_path).createDirectories();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
BlockInputStreamPtr in = std::make_shared<MergeTreeBlockInputStream>(
data, source_part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, all_columns.getNames(),
MarkRanges(1, MarkRange(0, source_part->marks_count)),
false, nullptr, String(), true, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
in = std::make_shared<ApplyingMutationsBlockInputStream>(in, commands, context);
auto compression_settings = context.chooseCompressionSettings(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_settings);
MergeTreeDataPart::MinMaxIndex minmax_idx;
in->readPrefix();
out.writePrefix();
Block block;
while (!actions_blocker.isCancelled() && (block = in->read()))
{
minmax_idx.update(block, data.minmax_idx_columns);
out.write(block);
}
if (actions_blocker.isCancelled())
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = std::move(minmax_idx);
in->readSuffix();
out.writeSuffixAndFinalizePart(new_data_part);
return new_data_part;
}
MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns, bool deduplicate) const

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MutationCommands.h>
#include <atomic>
#include <functional>
#include <Common/ActionBlocker.h>
@ -91,6 +92,11 @@ public:
MergeListEntry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication);
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
const FuturePart & future_part,
const std::vector<MutationCommand> & commands,
const Context & context);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::MutableDataPartPtr & new_data_part,
const MergeTreeData::DataPartsVector & parts,