experiment

This commit is contained in:
Nikita Vasilev 2019-08-05 21:06:05 +03:00
parent 4c53b90e66
commit 7b8c0f1750
3 changed files with 56 additions and 16 deletions

View File

@ -217,19 +217,20 @@ void MutationsInterpreter::prepare(bool dry_run)
}
/// First, break a sequence of commands into stages.
stages.emplace_back(context);
for (const auto & command : commands)
{
if (!stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (command.type == MutationCommand::DELETE)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
auto negated_predicate = makeASTFunction("not", command.predicate->clone());
stages.back().filters.push_back(negated_predicate);
}
else if (command.type == MutationCommand::UPDATE)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
@ -291,15 +292,39 @@ void MutationsInterpreter::prepare(bool dry_run)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
/// Special step to recalculate affected indices.
if (!affected_indices_columns.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto &stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}
auto first_stage_header = prepareInterpreterSelect(/* dry_run = */ true)->getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(in)->getHeader());
std::swap(stages, stages_copy);
}
/// Special step to recalculate affected indices.
stages.emplace_back(context);
for (const auto & column : affected_indices_columns)
stages.back().column_to_updated.emplace(
column, std::make_shared<ASTIdentifier>(column));
}
interpreter_select = prepareInterpreterSelect(dry_run);
is_prepared = true;
}
std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreterSelect(bool dry_run)
{
NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < stages.size(); ++i)
@ -363,7 +388,7 @@ void MutationsInterpreter::prepare(bool dry_run)
for (const auto & kv : stage.column_to_updated)
{
actions_chain.getLastActions()->add(ExpressionAction::copyColumn(
kv.second->getColumnName(), kv.first, /* can_replace = */ true));
kv.second->getColumnName(), kv.first, /* can_replace = */ true));
}
}
@ -403,9 +428,7 @@ void MutationsInterpreter::prepare(bool dry_run)
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits());
is_prepared = true;
return std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits());
}
BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(BlockInputStreamPtr in) const
@ -451,7 +474,15 @@ BlockInputStreamPtr MutationsInterpreter::execute()
{
prepare(/* dry_run = */ false);
BlockInputStreamPtr in = interpreter_select->execute().in;
return addStreamsForLaterStages(in);
auto result_stream = addStreamsForLaterStages(in);
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
return result_stream;
}
const Block & MutationsInterpreter::getUpdatedHeader() const
{
return *updated_header;
}
}

View File

@ -30,12 +30,18 @@ public:
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations() const;
/// The resulting stream will return blocks containing changed columns only.
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute();
/// Only changed columns.
const Block & getUpdatedHeader() const;
private:
void prepare(bool dry_run);
struct Stage;
std::unique_ptr<InterpreterSelectQuery> prepareInterpreterSelect(bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(BlockInputStreamPtr in) const;
private:
@ -78,6 +84,7 @@ private:
};
std::unique_ptr<InterpreterSelectQuery> interpreter_select;
std::unique_ptr<Block> updated_header;
std::vector<Stage> stages;
bool is_prepared = false; /// Has the sequence of stages been prepared.
};

View File

@ -949,6 +949,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Poco::File(new_part_tmp_path).createDirectories();
auto in = mutations_interpreter.execute();
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
Block in_header = in->getHeader();
@ -957,7 +959,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
MergeStageProgress stage_progress(1.0);
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
if (in_header.columns() == all_columns.size())
if (updated_header.columns() == all_columns.size())
{
/// All columns are modified, proceed to write a new part from scratch.
if (data.hasPrimaryKey() || data.hasSkipIndices())
@ -1025,7 +1027,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
auto mrk_extension = data.settings.index_granularity_bytes ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
for (const auto & entry : in_header)
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
@ -1055,12 +1057,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
createHardLink(dir_it.path().toString(), destination.toString());
}
merge_entry->columns_written = all_columns.size() - in_header.columns();
merge_entry->columns_written = all_columns.size() - updated_header.columns();
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data,
in_header,
updated_header,
new_part_tmp_path,
/* sync = */ false,
compression_codec,
@ -1099,7 +1101,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();)
{
if (source_columns_name_set.count(it->name) || in_header.has(it->name))
if (source_columns_name_set.count(it->name) || updated_header.has(it->name))
++it;
else
it = new_data_part->columns.erase(it);