Fix updateOutputStream and Traits

This commit is contained in:
Alexander Gololobov 2024-09-03 14:54:05 +02:00
parent 6a6935cb84
commit a1cec53b7c

View File

@ -861,7 +861,7 @@ public:
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, rows_sources_read_buf(rows_sources_read_buf_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
@ -891,12 +891,24 @@ public:
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
/// TODO: is this correct?
output_stream->sort_scope = DataStream::SortScope::None;
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
MergeTreeData::MergingParams merging_params{};
CompressedReadBufferFromFile * rows_sources_read_buf;
const UInt64 merge_block_size_rows;
@ -962,10 +974,9 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
}
{
auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_ctx->context),
pipelineSettings);
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
ctx->column_parts_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}
@ -1297,7 +1308,7 @@ bool MergeTask::execute()
/// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream
class ApplyMergeStep : public ITransformingStep /// TODO: is this transformation step?
class ApplyMergeStep : public ITransformingStep
{
public:
ApplyMergeStep(
@ -1311,7 +1322,7 @@ public:
bool blocks_are_granules_size_,
bool cleanup_,
time_t time_of_merge_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
@ -1403,16 +1414,24 @@ public:
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
output_stream->sort_description = sort_description;
/// TODO: is this correct?
// if (partition_key_columns.empty())
output_stream->sort_scope = DataStream::SortScope::Global;
// else
// output_stream->sort_scope = DataStream::SortScope::Stream;
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
@ -1425,12 +1444,12 @@ private:
};
class MaterializingStep : public ITransformingStep /// TODO: is this transformation step?
class MaterializingStep : public ITransformingStep
{
public:
explicit MaterializingStep(
const DataStream & input_stream_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
{}
String getName() const override { return "Materializing"; }
@ -1442,9 +1461,23 @@ public:
void updateOutputStream() override
{
/// TODO: can this be simplified?
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
output_stream->sort_description = input_streams.front().sort_description;
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
};
@ -1460,7 +1493,7 @@ public:
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time,
bool force_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
{
transform = std::make_shared<TTLTransform>(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_);
subqueries_for_sets = transform->getSubqueries();
@ -1477,10 +1510,25 @@ public:
void updateOutputStream() override
{
// TODO: implement?
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
std::shared_ptr<TTLTransform> transform;
PreparedSets::Subqueries subqueries_for_sets;
};
@ -1658,12 +1706,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
if (!subqueries.empty())
addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context);
auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_ctx->context),
pipelineSettings);
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
/// Dereference unique_ptr and pass horizontal_stage_progress by reference
global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
/// Is calculated inside MergeProgressCallback.