Splitting mutate part to temporarty part

This commit is contained in:
alesapin 2020-03-18 14:36:18 +03:00
parent eb938f66e4
commit d00406294c
2 changed files with 55 additions and 37 deletions

View File

@ -1084,43 +1084,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
/// TODO: check that we modify only non-key columns in this case.
/// Checks if columns used in skipping indexes modified.
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & col : updated_header.getNames())
{
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
const auto & index = data.skip_indices[i];
const auto & index_cols = index->getColumnsRequiredForIndexCalc();
auto it = std::find(std::cbegin(index_cols), std::cend(index_cols), col);
if (it != std::cend(index_cols) && indices_to_recalc.insert(index).second)
{
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
storage_from_source_part->getIndices().indices[i]->expr->clone());
for (const auto & expr : expr_list->children)
indices_recalc_expr_list->children.push_back(expr->clone());
}
}
}
if (!indices_to_recalc.empty())
{
auto indices_recalc_syntax =
SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, in->getHeader().getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because this stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
}
auto indices_to_recalc = getIndicesToRecalc(in, storage_from_source_part, updated_header.getNamesAndTypesList(), context);
NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension);
NameSet files_to_remove = collectFilesToRemove(source_part, for_file_renames, mrk_extension);
@ -1490,6 +1454,51 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
}
std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalc(
BlockInputStreamPtr & input_stream,
StoragePtr storage_from_source_part,
const NamesAndTypesList & updated_columns,
const Context & context) const
{
/// Checks if columns used in skipping indexes modified.
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & col : updated_columns.getNames())
{
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
const auto & index = data.skip_indices[i];
const auto & index_cols = index->getColumnsRequiredForIndexCalc();
auto it = std::find(std::cbegin(index_cols), std::cend(index_cols), col);
if (it != std::cend(index_cols) && indices_to_recalc.insert(index).second)
{
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
storage_from_source_part->getIndices().indices[i]->expr->clone());
for (const auto & expr : expr_list->children)
indices_recalc_expr_list->children.push_back(expr->clone());
}
}
}
if (!indices_to_recalc.empty() && input_stream)
{
auto indices_recalc_syntax =
SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, updated_columns);
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because this stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
input_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(input_stream, indices_recalc_expr));
}
return indices_to_recalc;
}
bool MergeTreeDataMergerMutator::shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const
{
if (!data.hasAnyTTL())
@ -1506,4 +1515,5 @@ bool MergeTreeDataMergerMutator::shouldExecuteTTL(const Names & columns, const M
return false;
}
}

View File

@ -162,6 +162,14 @@ private:
bool shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const;
/// Return set of indices which should be recalculated during mutation also
/// wraps input stream into additional expression stream
std::set<MergeTreeIndexPtr> getIndicesToRecalc(
BlockInputStreamPtr & input_stream,
StoragePtr storage_from_source_part,
const NamesAndTypesList & updated_columns,
const Context & context) const;
public :
/** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.