Compare commits

...

8 Commits

Author SHA1 Message Date
Shichao Jin
a4529a9767
Merge 5231d00616 into 4e56c026cd 2024-11-20 14:45:11 -05:00
jsc0218
5231d00616 remove unused and add compact projout 2024-11-20 03:11:36 +00:00
jsc0218
190651cf23 add test 2024-11-19 20:44:19 +00:00
jsc0218
0ee0ab9132 use existing projpart 2024-11-19 00:35:46 +00:00
jsc0218
8fa8d5ac0f add metadata for projpart and remove duplicated code 2024-11-19 00:08:14 +00:00
jsc0218
445879a2ac prepare to add meta for proj part 2024-11-18 02:09:50 +00:00
jsc0218
130a1151fe MutationInterpreter prepare for proj 2024-11-14 03:18:56 +00:00
jsc0218
af7e3640d1 preparaion 2024-11-11 01:52:19 +00:00
6 changed files with 144 additions and 4 deletions

View File

@ -756,12 +756,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
TemporaryPart temp_part; TemporaryPart temp_part;
const auto & metadata_snapshot = projection.metadata; const auto & metadata_snapshot = projection.metadata;
MergeTreeDataPartType part_type;
/// Size of part would not be greater than block.bytes() + epsilon /// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
// just check if there is enough space on parent volume // just check if there is enough space on parent volume
MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;
auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build(); auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build();
auto projection_part_storage = new_data_part->getDataPartStoragePtr(); auto projection_part_storage = new_data_part->getDataPartStoragePtr();

View File

@ -1008,7 +1008,6 @@ void finalizeMutatedPart(
new_data_part->default_codec = codec; new_data_part->default_codec = codec;
} }
} }
struct MutationContext struct MutationContext
@ -1023,7 +1022,9 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part; FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part; MergeTreeData::DataPartPtr source_part;
MergeTreeData::DataPartPtr projection_part;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr projection_metadata_snapshot;
MutationCommandsConstPtr commands; MutationCommandsConstPtr commands;
time_t time_of_mutation; time_t time_of_mutation;
@ -1040,6 +1041,11 @@ struct MutationContext
ProgressCallback progress_callback; ProgressCallback progress_callback;
Block updated_header; Block updated_header;
/// source here is projection part instead of table part
QueryPipelineBuilder projection_mutating_pipeline_builder;
QueryPipeline projection_mutating_pipeline;
std::unique_ptr<PullingPipelineExecutor> projection_mutating_executor;
std::unique_ptr<MutationsInterpreter> interpreter; std::unique_ptr<MutationsInterpreter> interpreter;
UInt64 watch_prev_elapsed = 0; UInt64 watch_prev_elapsed = 0;
std::unique_ptr<MergeStageProgress> stage_progress; std::unique_ptr<MergeStageProgress> stage_progress;
@ -1056,8 +1062,14 @@ struct MutationContext
MergeTreeData::MutableDataPartPtr new_data_part; MergeTreeData::MutableDataPartPtr new_data_part;
IMergedBlockOutputStreamPtr out; IMergedBlockOutputStreamPtr out;
MergeTreeData::MutableDataPartPtr new_projection_part;
IMergedBlockOutputStreamPtr projection_out;
String mrk_extension; String mrk_extension;
/// Used in lightweight delete to bit mask the projection if possible,
/// preference is higher than rebuild as more performant.
std::vector<ProjectionDescriptionRawPtr> projections_to_mask;
std::vector<ProjectionDescriptionRawPtr> projections_to_build; std::vector<ProjectionDescriptionRawPtr> projections_to_build;
IMergeTreeDataPart::MinMaxIndexPtr minmax_idx; IMergeTreeDataPart::MinMaxIndexPtr minmax_idx;
@ -1151,6 +1163,14 @@ public:
if (iterateThroughAllProjections()) if (iterateThroughAllProjections())
return true; return true;
state = State::NEED_MASK_PROJECTION_PARTS;
return true;
}
case State::NEED_MASK_PROJECTION_PARTS:
{
if (iterateThroughAllProjectionsToMask())
return true;
state = State::SUCCESS; state = State::SUCCESS;
return true; return true;
} }
@ -1170,6 +1190,7 @@ private:
void finalizeTempProjections(); void finalizeTempProjections();
bool iterateThroughAllProjections(); bool iterateThroughAllProjections();
void constructTaskForProjectionPartsMerge(); void constructTaskForProjectionPartsMerge();
bool iterateThroughAllProjectionsToMask();
void finalize(); void finalize();
enum class State : uint8_t enum class State : uint8_t
@ -1177,7 +1198,7 @@ private:
NEED_PREPARE, NEED_PREPARE,
NEED_MUTATE_ORIGINAL_PART, NEED_MUTATE_ORIGINAL_PART,
NEED_MERGE_PROJECTION_PARTS, NEED_MERGE_PROJECTION_PARTS,
NEED_MASK_PROJECTION_PARTS,
SUCCESS SUCCESS
}; };
@ -1343,6 +1364,14 @@ bool PartMergerWriter::iterateThroughAllProjections()
return true; return true;
} }
bool PartMergerWriter::iterateThroughAllProjectionsToMask()
{
for (size_t i = 0, size = ctx->projections_to_mask.size(); i < size; ++i)
{
}
return false;
}
void PartMergerWriter::finalize() void PartMergerWriter::finalize()
{ {
if (ctx->count_lightweight_deleted_rows) if (ctx->count_lightweight_deleted_rows)
@ -1660,6 +1689,30 @@ private:
ctx->mutating_pipeline.disableProfileEventUpdate(); ctx->mutating_pipeline.disableProfileEventUpdate();
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline); ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
chassert (ctx->projection_mutating_pipeline_builder.initialized());
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
auto part_name = fmt::format("{}", projections_name_and_part.begin()->first);
auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build();
ctx->projection_out = std::make_shared<MergedBlockOutputStream>(
new_data_part,
ctx->projection_metadata_snapshot,
ctx->projection_metadata_snapshot->getColumns().getAllPhysical(),
MergeTreeIndices{},
ColumnsStatistics{},
ctx->data->getContext()->chooseCompressionCodec(0, 0),
Tx::PrehistoricTID,
/*reset_columns=*/ false,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->data->getContext()->getWriteSettings());
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx); part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
} }
@ -1895,6 +1948,26 @@ private:
ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; ctx->projections_to_build = std::vector<ProjectionDescriptionRawPtr>{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()};
chassert (ctx->projection_mutating_pipeline_builder.initialized());
builder = std::make_unique<QueryPipelineBuilder>(std::move(ctx->projection_mutating_pipeline_builder));
// ctx->projection_out = std::make_shared<MergedColumnOnlyOutputStream>(
// ctx->new_projection_part,
// ctx->metadata_snapshot,
// ctx->updated_header.getNamesAndTypesList(),
// ctx->compression_codec,
// std::vector<MergeTreeIndexPtr>(),
// std::vector<ColumnStatisticsPartPtr>(),
// nullptr,
// /*save_marks_in_cache=*/ false,
// ctx->projection_part->index_granularity,
// &ctx->projection_part->index_granularity_info
// );
ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->projection_mutating_pipeline.disableProfileEventUpdate();
ctx->projection_mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->projection_mutating_pipeline);
part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx); part_merger_writer_task = std::make_unique<PartMergerWriter>(ctx);
} }
} }
@ -2308,6 +2381,32 @@ bool MutateTask::prepare()
ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
ctx->projection_metadata_snapshot = proj_desc.metadata;
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
ctx->projection_part = projections_name_and_part.begin()->second;
MutationCommands projection_commands;
MutationHelpers::splitAndModifyMutationCommands(
ctx->projection_part,
proj_desc.metadata,
alter_conversions,
ctx->commands_for_part,
projection_commands,
ctx->for_file_renames,
false,
ctx->log);
chassert(!projection_commands.empty());
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, ctx->projection_part, alter_conversions,
proj_desc.metadata, projection_commands,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();
lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name); lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name);
/// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know /// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know
/// the condition as early as from here. /// the condition as early as from here.

View File

@ -334,6 +334,17 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
return ret; return ret;
} }
void ProjectionDescription::mask(const Block & block[[maybe_unused]], ContextPtr context[[maybe_unused]]) const
{
// auto mut_context = Context::createCopy(context);
// query_ast_copy = query_ast->clone();
// auto builder = InterpreterAlterQuery(query_ast_copy, mut_context,
// Pipe(std::make_shared<SourceFromSingleChunk>(block)))
// .buildQueryPipeline();
// auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
// PullingPipelineExecutor executor(pipeline);
// executor.pull(ret);
}
String ProjectionsDescription::toString() const String ProjectionsDescription::toString() const
{ {

View File

@ -93,6 +93,8 @@ struct ProjectionDescription
Block calculate(const Block & block, ContextPtr context) const; Block calculate(const Block & block, ContextPtr context) const;
void mask(const Block & block, ContextPtr context) const;
String getDirectoryName() const { return name + ".proj"; } String getDirectoryName() const { return name + ".proj"; }
}; };

View File

@ -0,0 +1,29 @@
-- compact
DROP TABLE IF EXISTS users;
CREATE TABLE users (
uid Int16,
name String,
age Int16,
projection p1 (select age, count() group by age),
) ENGINE = MergeTree order by uid
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760;
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
DELETE FROM users WHERE age = 34;
-- wide
DROP TABLE IF EXISTS users;
CREATE TABLE users (
uid Int16,
name String,
age Int16,
projection p1 (select age, count() group by age),
) ENGINE = MergeTree order by uid
SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0;
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
DELETE FROM users WHERE age = 34;