Fix more tests.

This commit is contained in:
Nikolai Kochetov 2022-12-29 17:52:31 +00:00
parent 2dacdee58b
commit 3c02e208c8
3 changed files with 128 additions and 35 deletions

View File

@ -32,6 +32,7 @@
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
namespace DB
@ -958,16 +959,106 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
}
}
struct VirtualColumns
{
struct ColumnAndPosition
{
ColumnWithTypeAndName column;
size_t position;
};
using Columns = std::vector<ColumnAndPosition>;
Columns virtuals;
Names columns_to_read;
explicit VirtualColumns(Names required_columns, const MergeTreeData::DataPartPtr & part) : columns_to_read(std::move(required_columns))
{
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (columns_to_read[i] == LightweightDeleteDescription::FILTER_COLUMN.name)
{
LoadedMergeTreeDataPartInfoForReader part_info_reader(part);
if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name))
{
ColumnWithTypeAndName mask_column;
mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type;
mask_column.column = mask_column.type->createColumnConst(0, 1);
mask_column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(mask_column), .position = i});
}
}
else if (columns_to_read[i] == "_partition_id")
{
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeString>();
column.column = column.type->createColumnConst(0, part->info.partition_id);
column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i});
}
}
if (!virtuals.empty())
{
Names columns_no_virtuals;
columns_no_virtuals.reserve(columns_to_read.size());
size_t next_virtual = 0;
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (next_virtual < virtuals.size() && i == virtuals[next_virtual].position)
++next_virtual;
else
columns_no_virtuals.emplace_back(std::move(columns_to_read[i]));
}
columns_to_read.swap(columns_no_virtuals);
}
}
void addVirtuals(QueryPlan & plan)
{
auto dag = std::make_unique<ActionsDAG>(plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
for (auto & column : virtuals)
{
const auto & adding_const = dag->addColumn(std::move(column.column));
auto & outputs = dag->getOutputs();
outputs.insert(outputs.begin() + column.position, &adding_const);
}
auto step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(dag));
plan.addStep(std::move(step));
}
};
void MutationsInterpreter::Source::read(
Stage & first_stage,
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_) const
bool apply_deleted_mask_,
bool can_execute_) const
{
auto required_columns = first_stage.expressions_chain.steps.front()->getRequiredColumns().getNames();
auto storage_snapshot = getStorageSnapshot(snapshot_, context_);
if (!can_execute_)
{
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
auto callback = []()
{
return DB::Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute a mutation because can_execute flag set to false");
};
Pipe pipe(std::make_shared<ThrowingExceptionSource>(header, callback));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
plan.addStep(std::move(read_from_pipe));
return;
}
// for (const auto & name : required_columns)
// std::cerr << "====== Required column " + name << std::endl;
@ -991,42 +1082,13 @@ void MutationsInterpreter::Source::read(
filter = ActionsDAG::buildFilterActionsDAG(nodes, {}, context_);
}
size_t delete_mask_pos = required_columns.size();
for (size_t i = 0; i < delete_mask_pos; ++i)
if (required_columns[i] == LightweightDeleteDescription::FILTER_COLUMN.name)
delete_mask_pos = i;
bool add_default_delete_mask = false;
if (delete_mask_pos < required_columns.size())
{
LoadedMergeTreeDataPartInfoForReader part_info_reader(part);
if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name))
{
required_columns.erase(required_columns.begin() + delete_mask_pos);
add_default_delete_mask = true;
}
}
VirtualColumns virtual_columns(std::move(required_columns), part);
createMergeTreeSequentialSource(
plan, *data, storage_snapshot, part, required_columns, apply_deleted_mask_, filter, context_,
plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_,
&Poco::Logger::get("MutationsInterpreter"));
if (add_default_delete_mask)
{
auto dag = std::make_unique<ActionsDAG>(plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
ColumnWithTypeAndName mask_column;
mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type;
mask_column.column = mask_column.type->createColumnConst(0, 1);
mask_column.name = LightweightDeleteDescription::FILTER_COLUMN.name;
const auto & adding_const = dag->addColumn(std::move(mask_column));
auto & outputs = dag->getOutputs();
outputs.insert(outputs.begin() + delete_mask_pos, &adding_const);
auto step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(dag));
plan.addStep(std::move(step));
}
virtual_columns.addVirtuals(plan);
// std::cerr << "<<<<<<<<< " << plan.getCurrentDataStream().header.dumpStructure() << std::endl;
@ -1093,7 +1155,7 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask);
source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute);
// const auto & steps = first_stage.expressions_chain.steps;
// const auto & names = first_stage.filter_column_names;

View File

@ -111,7 +111,8 @@ public:
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_) const;
bool apply_deleted_mask_,
bool can_execute_) const;
};
private:

View File

@ -0,0 +1,30 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
class ThrowingExceptionSource : public ISource
{
public:
using CallBack = std::function<Exception()>;
explicit ThrowingExceptionSource(Block header, CallBack callback_)
: ISource(std::move(header))
, callback(std::move(callback_))
{}
String getName() const override { return "ThrowingExceptionSource"; }
protected:
Chunk generate() override
{
throw callback();
}
CallBack callback;
};
}