Fix unknown column bug in sampling.

This commit is contained in:
Nikolai Kochetov 2021-08-06 17:23:50 +03:00
parent 14ef66e49b
commit 48eb995461
3 changed files with 26 additions and 4 deletions

View File

@ -995,21 +995,18 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
Block cur_header = result_projection ? result_projection->getResultColumns()
: pipe.getHeader();
auto append_actions = [&result_projection, &cur_header](ActionsDAGPtr actions)
auto append_actions = [&result_projection](ActionsDAGPtr actions)
{
if (!result_projection)
result_projection = std::move(actions);
else
result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
cur_header = result_projection->getResultColumns();
};
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
{
ColumnWithTypeAndName column;
column.name = "_sample_factor";
column.type = std::make_shared<DataTypeFloat64>();
column.column = column.type->createColumnConst(0, Field(result.sampling.used_sample_factor));
@ -1017,6 +1014,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
append_actions(std::move(adding_column));
}
if (result_projection)
cur_header = result_projection->updateHeader(cur_header);
/// Extra columns may be returned (for example, if sampling is used).
/// Convert pipe to step header structure.
if (!isCompatibleHeader(cur_header, getOutputStream().header))

View File

@ -0,0 +1,2 @@
1
1 1 1

View File

@ -0,0 +1,20 @@
drop table if exists sessions;
CREATE TABLE sessions
(
`user_id` UInt64
)
ENGINE = MergeTree
ORDER BY user_id
SAMPLE BY user_id;
insert into sessions values(1);
SELECT
sum(user_id * _sample_factor)
FROM sessions
SAMPLE 10000000;
SELECT
uniq(user_id) a, min(_sample_factor) x, a*x
FROM sessions
SAMPLE 10000000;