For some columns mutations, skip to apply deleted mask when read some columns. Also add unit test case

This commit is contained in:
jianmei zhang 2022-06-23 21:02:22 +08:00
parent 2c74e9b866
commit 9d27af7ee2
12 changed files with 129 additions and 31 deletions

View File

@ -360,6 +360,9 @@ private:
inline static ContextPtr global_context_instance;
/// A flag, used to mark if reader needs to apply deleted rows mask.
bool skip_deleted_mask = false;
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
@ -912,6 +915,9 @@ public:
bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }
bool skipDeletedMask() const { return skip_deleted_mask; }
void setSkipDeletedMask(bool skip) { skip_deleted_mask = skip; }
ActionLocksManagerPtr getActionLocksManager();
enum class ApplicationType

View File

@ -1041,7 +1041,18 @@ QueryPipelineBuilder MutationsInterpreter::execute()
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
if (!select_interpreter)
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
{
/// Skip to apply deleted mask for MutateSomePartColumn cases when part has lightweight delete.
if (!is_lightweight && skip_deleted_mask)
{
auto context_for_reading = Context::createCopy(context);
context_for_reading->setSkipDeletedMask(skip_deleted_mask);
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context_for_reading, storage, metadata_snapshot, select_limits);
}
else
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
}
QueryPlan plan;
select_interpreter->buildQueryPlan(plan);

View File

@ -79,6 +79,8 @@ public:
MutationKind::MutationKindEnum getMutationKind() const { return mutation_kind.mutation_kind; }
void SetSkipDeletedMask(bool skip) { skip_deleted_mask = skip; }
private:
ASTPtr prepare(bool dry_run);
ASTPtr prepareLightweightDelete(bool dry_run);
@ -103,6 +105,8 @@ private:
/// True for lightweight delete.
bool is_lightweight = false;
/// True for MutateSomePartColumns on part with lightweight.
bool skip_deleted_mask = false;
ASTPtr mutation_ast;

View File

@ -62,6 +62,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
.read_in_order = query_info.input_order_info != nullptr,
.skip_deleted_mask = context->skipDeletedMask(),
};
}

View File

@ -62,6 +62,8 @@ public:
MergeTreeData::DataPartPtr data_part;
bool needReadDeletedMask() { return !settings.skip_deleted_mask && data_part->hasLightweightDelete(); }
protected:
/// Returns actual column type in part, which can differ from table metadata.
NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const;

View File

@ -23,6 +23,8 @@ struct MergeTreeReaderSettings
bool checksum_on_read = true;
/// True if we read in order of sorting key.
bool read_in_order = false;
/// Do not apply deleted mask for internal select from mutate some part columns.
bool skip_deleted_mask = false;
};
struct MergeTreeWriterSettings

View File

@ -673,8 +673,9 @@ MergeTreeRangeReader::MergeTreeRangeReader(
sample_block.insert(ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name));
}
need_read_deleted_mask = merge_tree_reader->data_part->hasLightweightDelete();
deleted_rows_mask = merge_tree_reader->data_part->deleted_rows_mask;
need_read_deleted_mask = merge_tree_reader->needReadDeletedMask();
if (need_read_deleted_mask)
deleted_rows_mask = merge_tree_reader->data_part->deleted_rows_mask;
if (prewhere_info)
{

View File

@ -604,6 +604,7 @@ void finalizeMutatedPart(
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->loadProjections(false, false);
new_data_part->loadDeletedRowMask();
new_data_part->setBytesOnDisk(new_data_part->data_part_storage->calculateTotalSizeOnDisk());
new_data_part->default_codec = codec;
new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
@ -1509,8 +1510,6 @@ private:
if (has_deleted_rows)
{
ctx->new_data_part->writeLightWeightDeletedMask(new_bitmap);
ctx->new_data_part->has_lightweight_delete = true;
ctx->new_data_part->deleted_rows_mask = new_bitmap;
}
}
@ -1646,6 +1645,8 @@ bool MutateTask::prepare()
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
bool need_mutate_all_columns = !isWidePart(ctx->source_part);
if (!ctx->for_interpreter.empty())
{
ctx->interpreter = std::make_unique<MutationsInterpreter>(
@ -1653,6 +1654,11 @@ bool MutateTask::prepare()
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();
/// Skip to apply deleted mask when reading for MutateSomePartColumns.
need_mutate_all_columns = need_mutate_all_columns || (ctx->mutation_kind == MutationsInterpreter::MutationKind::MUTATE_OTHER && ctx->interpreter->isAffectingAllColumns());
if(!need_mutate_all_columns && ctx->source_part->hasLightweightDelete() && !ctx->is_lightweight_mutation)
ctx->interpreter->SetSkipDeletedMask(true);
ctx->mutating_pipeline_builder = ctx->interpreter->execute();
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
@ -1703,8 +1709,7 @@ bool MutateTask::prepare()
/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part)
|| (ctx->mutation_kind == MutationsInterpreter::MutationKind::MUTATE_OTHER && ctx->interpreter && ctx->interpreter->isAffectingAllColumns()))
if (need_mutate_all_columns)
{
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
}

View File

@ -0,0 +1,31 @@
99
95
0
-----lightweight mutation type-----
1 DELETE WHERE (c % 5) = 1 1
1 DELETE WHERE c = 4 1
0 MATERIALIZE INDEX i_c 1
0 UPDATE b = -1 WHERE a < 3 1
0 DROP INDEX i_c 1
-----Check that select and merge with lightweight delete.-----
7
0 -1 0
2 -1 2
3 3 3
5 5 5
7 7 7
8 8 8
9 9 9
t_light 0 0_1_1_0_10 2
t_light 1 1_2_2_0_10 2
t_light 2 2_3_3_0_10 2
t_light 3 3_4_4_0_10 2
t_light 4 4_5_5_0_10 2
7
t_light 0 0_1_1_1_10 2
t_light 2 2_3_3_1_10 2
t_light 3 3_4_4_1_10 2
t_light 4 4_5_5_1_10 1
-----Test lightweight delete in multi blocks-----
1000 -2
1005 -2

View File

@ -0,0 +1,59 @@
DROP TABLE IF EXISTS merge_table_standard_delete;
CREATE TABLE merge_table_standard_delete(id Int32, name String) ENGINE = MergeTree order by id settings min_bytes_for_wide_part=0;
INSERT INTO merge_table_standard_delete select number, toString(number) from numbers(100);
SET mutations_sync = 1;
DELETE FROM merge_table_standard_delete WHERE id = 10;
SELECT COUNT() FROM merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE name IN ('1','2','3','4');
SELECT COUNT() FROM merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE 1;
SELECT COUNT() FROM merge_table_standard_delete;
DROP TABLE merge_table_standard_delete;
drop table if exists t_light;
create table t_light(a int, b int, c int, index i_c(b) type minmax granularity 4) engine = MergeTree order by a partition by c % 5 settings min_bytes_for_wide_part=0;
INSERT INTO t_light SELECT number, number, number FROM numbers(10);
SELECT '-----lightweight mutation type-----';
DELETE FROM t_light WHERE c%5=1;
DELETE FROM t_light WHERE c=4;
alter table t_light MATERIALIZE INDEX i_c;
alter table t_light update b=-1 where a<3;
alter table t_light drop index i_c;
SELECT is_lightweight, command, is_done FROM system.mutations WHERE database = currentDatabase() AND table = 't_light';
SELECT '-----Check that select and merge with lightweight delete.-----';
select count(*) from t_light;
select * from t_light order by a;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' order by name;
optimize table t_light final;
select count(*) from t_light;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' and rows > 0 order by name;
drop table t_light;
SELECT '-----Test lightweight delete in multi blocks-----';
CREATE TABLE t_large(a UInt32, b int) ENGINE=MergeTree order BY a settings min_bytes_for_wide_part=0;
INSERT INTO t_large SELECT number + 1, number + 1 FROM numbers(100000);
DELETE FROM t_large WHERE a = 50000;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005;
ALTER TABLE t_large DELETE WHERE a=1;
SELECT * FROM t_large WHERE a in (1,1000,1005,50000) order by a;
DROP TABLE t_large;

View File

@ -1,21 +0,0 @@
DROP TABLE IF EXISTS merge_table_standard_delete;
CREATE TABLE merge_table_standard_delete(id Int32, name String) ENGINE = MergeTree order by id;
INSERT INTO merge_table_standard_delete select number, toString(number) from numbers(100);
SET mutations_sync = 1;
DELETE FROM merge_table_standard_delete WHERE id = 10;
SELECT COUNT() FROM merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE name IN ('1','2','3','4');
SELECT COUNT() FROM merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE 1;
SELECT COUNT() FROM merge_table_standard_delete;
DROP TABLE merge_table_standard_delete;