added indices to mutationsInterpreter

This commit is contained in:
Nikita Vasilev 2019-04-17 20:07:07 +03:00
parent 20f0b17cf4
commit 8e8c77a46b
3 changed files with 56 additions and 8 deletions

View File

@ -173,6 +173,7 @@ void MutationsInterpreter::prepare(bool dry_run)
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
const ColumnsDescription & columns_desc = storage->getColumns();
const IndicesDescription & indices_desc = storage->getIndicesDescription();
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
NameSet updated_columns;
@ -182,9 +183,10 @@ void MutationsInterpreter::prepare(bool dry_run)
updated_columns.insert(kv.first);
}
/// We need to know which columns affect which MATERIALIZED columns to recalculate them if dependencies
/// are updated.
/// We need to know which columns affect which MATERIALIZED columns and data skipping indices
/// to recalculate them if dependencies are updated.
std::unordered_map<String, Names> column_to_affected_materialized;
NameSet affected_indices_columns;
if (!updated_columns.empty())
{
for (const auto & column : columns_desc)
@ -201,6 +203,22 @@ void MutationsInterpreter::prepare(bool dry_run)
}
}
}
for (const auto & index : indices_desc.indices)
{
auto query = index->expr->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context);
const auto required_columns = analyzer.getRequiredSourceColumns();
for (const String & dependency : required_columns)
{
if (updated_columns.count(dependency))
{
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
break;
}
}
}
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
}
@ -263,6 +281,20 @@ void MutationsInterpreter::prepare(bool dry_run)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
/// Special step to recalculate affected indices.
if (!affected_indices_columns.empty())
{
stages.emplace_back(context);
for (const auto & column : affected_indices_columns)
{
stages.back().column_to_updated.emplace(
column,
std::make_shared<ASTLiteral>(
columns_desc.getPhysical(column).type->getName()));
}
}
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < stages.size(); ++i)
{

View File

@ -918,19 +918,33 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// Checks if columns used in skipping indexes modified
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & col : in_header.getNames())
{
for (const auto & index : data.skip_indices)
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
const auto & index = data.skip_indices[i];
const auto & index_cols = index->expr->getRequiredColumns();
auto it = find(cbegin(index_cols), cend(index_cols), col);
if (it != cend(index_cols))
indices_to_recalc.insert(index);
/*throw Exception("You can not modify columns used in index. Index name: '"
+ index->name
+ "' bad column: '" + *it + "'", ErrorCodes::ILLEGAL_COLUMN);*/
if (it != cend(index_cols) && indices_to_recalc.insert(index).second)
{
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
storage_from_source_part->getIndicesDescription().indices[i]->expr->clone());
for (const auto & expr : expr_list->children)
indices_recalc_expr_list->children.push_back(expr->clone());
}
}
}
if (!indices_to_recalc.empty())
{
auto indices_recalc_syntax = SyntaxAnalyzer(context, {}).analyze(
indices_recalc_expr_list, in_header.getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
}
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
for (const auto & entry : in_header)

View File

@ -37,6 +37,8 @@ public:
return part->storage.mayBenefitFromIndexForIn(left_in_operand);
}
const IndicesDescription & getIndicesDescription() const override { return part->storage.getIndicesDescription(); }
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getColumns()), part(part_)