Fix _sample_factor column.

This commit is contained in:
Nikolai Kochetov 2021-05-28 12:23:46 +03:00
parent ce11f35dcc
commit 4e28b7cb02
3 changed files with 46 additions and 50 deletions

View File

@ -92,7 +92,6 @@ struct ReadFromMergeTree::AnalysisResult
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
bool sample_factor_column_queried = false;
String query_id;
IndexStats index_stats;
Names column_names_to_read;
@ -130,6 +129,13 @@ ReadFromMergeTree::ReadFromMergeTree(
, settings(std::move(settings_))
, log(log_)
{
if (settings.sample_factor_column_queried)
{
/// Only _sample_factor virtual column is added by ReadFromMergeTree
/// Other virtual columns are added by MergeTreeBaseSelectProcessor.
auto type = std::make_shared<DataTypeFloat64>();
output_stream->header.insert({type->createColumn(), type, "_sample_factor"});
}
}
Pipe ReadFromMergeTree::readFromPool(
@ -894,13 +900,9 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
MergeTreeDataSelectExecutor::filterPartsByPartition(
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, result.index_stats);
for (const auto & col : virt_column_names)
if (col == "_sample_factor")
result.sample_factor_column_queried = true;
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select, parts, metadata_snapshot, key_condition,
data, log, result.sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
data, log, settings.sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
if (result.sampling.read_nothing)
return result;
@ -1062,55 +1064,61 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
});
}
if (result_projection)
Block cur_header = result_projection ? result_projection->getResultColumns()
: pipe.getHeader();
auto append_actions = [&result_projection, &cur_header](ActionsDAGPtr actions)
{
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, projection_actions);
});
}
if (!result_projection)
result_projection = std::move(actions);
else
result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
cur_header = result_projection->getResultColumns();
};
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (result.sample_factor_column_queried)
if (settings.sample_factor_column_queried)
{
ColumnWithTypeAndName column;
column.name = "_sample_factor";
column.type = std::make_shared<DataTypeFloat64>();
column.column = column.type->createColumnConst(0, Field(result.sampling.used_sample_factor));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_action = std::make_shared<ExpressionActions>(adding_column_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, adding_column_action);
});
auto adding_column = ActionsDAG::makeAddingColumnActions(std::move(column));
append_actions(std::move(adding_column));
}
// TODO There seems to be no place initializing remove_columns_actions
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
auto remove_columns_action = std::make_shared<ExpressionActions>(
query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
// if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
// {
// auto remove_columns_action = std::make_shared<ExpressionActions>(
// query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, remove_columns_action);
});
}
// pipe.addSimpleTransform([&](const Block & header)
// {
// return std::make_shared<ExpressionTransform>(header, remove_columns_action);
// });
// }
if (!isCompatibleHeader(pipe.getHeader(), getOutputStream().header))
/// Extra columns may be returned (for example, if sampling is used).
/// Convert pipe to step header structure.
if (!isCompatibleHeader(cur_header, getOutputStream().header))
{
auto converting_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
auto converting = ActionsDAG::makeConvertingActions(
cur_header.getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
pipe.addSimpleTransform([&](const Block & cur_header)
append_actions(std::move(converting));
}
if (result_projection)
{
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(cur_header, converting_actions);
return std::make_shared<ExpressionTransform>(header, projection_actions);
});
}

View File

@ -50,6 +50,7 @@ public:
//size_t min_marks_for_concurrent_read;
bool use_uncompressed_cache;
bool force_primary_key;
bool sample_factor_column_queried;
MergeTreeReaderSettings reader_settings;
MergeTreeReadPool::BackoffSettings backoff_settings;

View File

@ -1208,26 +1208,13 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
.num_streams = num_streams,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
//.min_marks_for_concurrent_read = settings.min_marks_for_concurrent_read,
.use_uncompressed_cache = settings.use_uncompressed_cache,
.force_primary_key = settings.force_primary_key,
.sample_factor_column_queried = sample_factor_column_queried,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
// const SelectQueryInfo & query_info_,
// const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_,
// ContextPtr context_,
// const MergeTreeData & data_,
// StorageMetadataPtr metadata_snapshot_,
// StorageMetadataPtr metadata_snapshot_base_,
// Names real_column_names_,
// MergeTreeData::DataPartsVector parts_,
// PrewhereInfoPtr prewhere_info_,
// Names virt_column_names_,
// Settings settings_,
// Poco::Logger * log_
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
query_info,
max_block_numbers_to_read,