mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge 5231d00616
into 44b4bd38b9
This commit is contained in:
commit
06e93c4a15
@ -756,12 +756,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
TemporaryPart temp_part;
|
||||
const auto & metadata_snapshot = projection.metadata;
|
||||
|
||||
MergeTreeDataPartType part_type;
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
size_t expected_size = block.bytes();
|
||||
// just check if there is enough space on parent volume
|
||||
MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage());
|
||||
part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;
|
||||
MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;
|
||||
|
||||
auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build();
|
||||
auto projection_part_storage = new_data_part->getDataPartStoragePtr();
|
||||
|
@ -1008,7 +1008,6 @@ void finalizeMutatedPart(
|
||||
|
||||
new_data_part->default_codec = codec;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
struct MutationContext
|
||||
@ -1023,7 +1022,9 @@ struct MutationContext
|
||||
|
||||
FutureMergedMutatedPartPtr future_part;
|
||||
MergeTreeData::DataPartPtr source_part;
|
||||
MergeTreeData::DataPartPtr projection_part;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
StorageMetadataPtr projection_metadata_snapshot;
|
||||
|
||||
MutationCommandsConstPtr commands;
|
||||
time_t time_of_mutation;
|
||||
@ -1040,6 +1041,11 @@ struct MutationContext
|
||||
ProgressCallback progress_callback;
|
||||
Block updated_header;
|
||||
|
||||
/// source here is projection part instead of table part
|
||||
QueryPipelineBuilder projection_mutating_pipeline_builder;
|
||||
QueryPipeline projection_mutating_pipeline;
|
||||
std::unique_ptr<PullingPipelineExecutor> projection_mutating_executor;
|
||||
|
||||
std::unique_ptr<MutationsInterpreter> interpreter;
|
||||
UInt64 watch_prev_elapsed = 0;
|
||||
std::unique_ptr<MergeStageProgress> stage_progress;
|
||||
@ -1056,8 +1062,14 @@ struct MutationContext
|
||||
MergeTreeData::MutableDataPartPtr new_data_part;
|
||||
IMergedBlockOutputStreamPtr out;
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_projection_part;
|
||||
IMergedBlockOutputStreamPtr projection_out;
|
||||
|
||||
String mrk_extension;
|
||||
|
||||
/// Used in lightweight delete to bit mask the projection if possible,
|
||||
/// preference is higher than rebuild as more performant.
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_mask;
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_build;
|
||||
IMergeTreeDataPart::MinMaxIndexPtr minmax_idx;
|
||||
|
||||
@ -1151,6 +1163,14 @@ public:
|
||||
if (iterateThroughAllProjections())
|
||||
return true;
|
||||
|
||||
state = State::NEED_MASK_PROJECTION_PARTS;
|
||||
return true;
|
||||
}
|
||||
case State::NEED_MASK_PROJECTION_PARTS:
|
||||
{
|
||||
if (iterateThroughAllProjectionsToMask())
|
||||
return true;
|
||||
|
||||
state = State::SUCCESS;
|
||||
return true;
|
||||
}
|
||||
@ -1170,6 +1190,7 @@ private:
|
||||
void finalizeTempProjections();
|
||||
bool iterateThroughAllProjections();
|
||||
void constructTaskForProjectionPartsMerge();
|
||||
bool iterateThroughAllProjectionsToMask();
|
||||
void finalize();
|
||||
|
||||
enum class State : uint8_t
|
||||
@ -1177,7 +1198,7 @@ private:
|
||||
NEED_PREPARE,
|
||||
NEED_MUTATE_ORIGINAL_PART,
|
||||
NEED_MERGE_PROJECTION_PARTS,
|
||||
|
||||
NEED_MASK_PROJECTION_PARTS,
|
||||
SUCCESS
|
||||
};
|
||||
|
||||
@ -1343,6 +1364,14 @@ bool PartMergerWriter::iterateThroughAllProjections()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PartMergerWriter::iterateThroughAllProjectionsToMask()
|
||||
{
|
||||
for (size_t i = 0, size = ctx->projections_to_mask.size(); i < size; ++i)
|
||||
{
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void PartMergerWriter::finalize()
|
||||
{
|
||||
if (ctx->count_lightweight_deleted_rows)
|
||||
@ -1660,6 +1689,30 @@ private:
|
||||
ctx->mutating_pipeline.disableProfileEventUpdate();
|
||||
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
|
||||
|
||||
chassert (ctx->projection_mutating_pipeline_builder.initialized());
|
||||
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
|
||||
|
||||
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
|
||||
auto part_name = fmt::format("{}", projections_name_and_part.begin()->first);
|
||||
auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build();
|
||||
|
||||
ctx->projection_out = std::make_shared<MergedBlockOutputStream>(
|
||||
new_data_part,
|
||||
ctx->projection_metadata_snapshot,
|
||||
ctx->projection_metadata_snapshot->getColumns().getAllPhysical(),
|
||||
MergeTreeIndices{},
|
||||
ColumnsStatistics{},
|
||||
ctx->data->getContext()->chooseCompressionCodec(0, 0),
|
||||
Tx::PrehistoricTID,
|
||||
/*reset_columns=*/ false,
|
||||
/*save_marks_in_cache=*/ false,
|
||||
/*blocks_are_granules_size=*/ false,
|
||||
ctx->data->getContext()->getWriteSettings());
|
||||
|
||||
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
|
||||
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
|
||||
|
||||
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
|
||||
}
|
||||
|
||||
@ -1895,6 +1948,26 @@ private:
|
||||
|
||||
ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()};
|
||||
|
||||
chassert (ctx->projection_mutating_pipeline_builder.initialized());
|
||||
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
|
||||
|
||||
// ctx->projection_out = std::make_shared<MergedColumnOnlyOutputStream>(
|
||||
// ctx->new_projection_part,
|
||||
// ctx->metadata_snapshot,
|
||||
// ctx->updated_header.getNamesAndTypesList(),
|
||||
// ctx->compression_codec,
|
||||
// std::vector<MergeTreeIndexPtr>(),
|
||||
// std::vector<ColumnStatisticsPartPtr>(),
|
||||
// nullptr,
|
||||
// /*save_marks_in_cache=*/ false,
|
||||
// ctx->projection_part->index_granularity,
|
||||
// &ctx->projection_part->index_granularity_info
|
||||
// );
|
||||
|
||||
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
|
||||
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
|
||||
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
|
||||
|
||||
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
|
||||
}
|
||||
}
|
||||
@ -2308,6 +2381,32 @@ bool MutateTask::prepare()
|
||||
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
|
||||
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
|
||||
|
||||
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
|
||||
ctx->projection_metadata_snapshot = proj_desc.metadata;
|
||||
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
|
||||
ctx->projection_part = projections_name_and_part.begin()->second;
|
||||
|
||||
MutationCommands projection_commands;
|
||||
|
||||
MutationHelpers::splitAndModifyMutationCommands(
|
||||
ctx->projection_part,
|
||||
proj_desc.metadata,
|
||||
alter_conversions,
|
||||
ctx->commands_for_part,
|
||||
projection_commands,
|
||||
ctx->for_file_renames,
|
||||
false,
|
||||
ctx->log);
|
||||
|
||||
chassert(!projection_commands.empty());
|
||||
|
||||
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
|
||||
*ctx->data, ctx->projection_part, alter_conversions,
|
||||
proj_desc.metadata, projection_commands,
|
||||
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
|
||||
|
||||
ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();
|
||||
|
||||
lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name);
|
||||
/// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know
|
||||
/// the condition as early as from here.
|
||||
|
@ -334,6 +334,17 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ProjectionDescription::mask(const Block & block[[maybe_unused]], ContextPtr context[[maybe_unused]]) const
|
||||
{
|
||||
// auto mut_context = Context::createCopy(context);
|
||||
// query_ast_copy = query_ast->clone();
|
||||
// auto builder = InterpreterAlterQuery(query_ast_copy, mut_context,
|
||||
// Pipe(std::make_shared<SourceFromSingleChunk>(block)))
|
||||
// .buildQueryPipeline();
|
||||
// auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
|
||||
// PullingPipelineExecutor executor(pipeline);
|
||||
// executor.pull(ret);
|
||||
}
|
||||
|
||||
String ProjectionsDescription::toString() const
|
||||
{
|
||||
|
@ -93,6 +93,8 @@ struct ProjectionDescription
|
||||
|
||||
Block calculate(const Block & block, ContextPtr context) const;
|
||||
|
||||
void mask(const Block & block, ContextPtr context) const;
|
||||
|
||||
String getDirectoryName() const { return name + ".proj"; }
|
||||
};
|
||||
|
||||
|
29
tests/queries/0_stateless/03261_projection_mask.sql
Normal file
29
tests/queries/0_stateless/03261_projection_mask.sql
Normal file
@ -0,0 +1,29 @@
|
||||
-- compact
|
||||
DROP TABLE IF EXISTS users;
|
||||
|
||||
CREATE TABLE users (
|
||||
uid Int16,
|
||||
name String,
|
||||
age Int16,
|
||||
projection p1 (select age, count() group by age),
|
||||
) ENGINE = MergeTree order by uid
|
||||
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760;
|
||||
|
||||
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
|
||||
|
||||
DELETE FROM users WHERE age = 34;
|
||||
|
||||
-- wide
|
||||
DROP TABLE IF EXISTS users;
|
||||
|
||||
CREATE TABLE users (
|
||||
uid Int16,
|
||||
name String,
|
||||
age Int16,
|
||||
projection p1 (select age, count() group by age),
|
||||
) ENGINE = MergeTree order by uid
|
||||
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0;
|
||||
|
||||
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
|
||||
|
||||
DELETE FROM users WHERE age = 34;
|
Loading…
Reference in New Issue
Block a user