Allow to use FINAL even in case of single part #2086

This commit is contained in:
Alexey Milovidov 2018-04-06 00:36:55 +03:00
parent 6452a3bb76
commit f66d2fb780
9 changed files with 67 additions and 70 deletions

View File

@ -17,9 +17,6 @@ Block AggregatingSortedBlockInputStream::readImpl()
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;

View File

@ -108,9 +108,6 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;

View File

@ -35,9 +35,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;

View File

@ -14,6 +14,7 @@
#include <Functions/FunctionHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -95,9 +96,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
@ -339,7 +337,7 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
addRow(current);
// Merge maps only for same rows
for (auto & desc : maps_to_sum)
for (const auto & desc : maps_to_sum)
{
if (mergeMap(desc, current_row, current))
current_row_is_zero = false;

View File

@ -52,9 +52,6 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;

View File

@ -570,8 +570,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
prewhere_actions,
prewhere_column,
virt_column_names,
settings,
context);
settings);
}
else
{
@ -751,8 +750,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns,
const Settings & settings,
const Context & context) const
const Settings & settings) const
{
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
@ -782,63 +780,43 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
}
BlockInputStreams res;
if (to_merge.size() == 1)
BlockInputStreamPtr merged;
switch (data.merging_params.mode)
{
if (data.merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
case MergeTreeData::MergingParams::Ordinary:
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
createPositiveSignCondition(sign_filter_expression, sign_filter_column, context);
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
res.emplace_back(std::make_shared<FilterBlockInputStream>(to_merge[0], sign_filter_expression, sign_filter_column));
}
else
res = to_merge;
}
else if (to_merge.size() > 1)
{
BlockInputStreamPtr merged;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
res.emplace_back(merged);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
return res;
return {merged};
}

View File

@ -55,8 +55,7 @@ private:
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns,
const Settings & settings,
const Context & context) const;
const Settings & settings) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead(

View File

@ -0,0 +1,7 @@
2018-03-21 1 1
2018-03-21 1 2
2018-03-21 1 1
2018-03-21 1 2
2018-03-21 1 2
2018-03-21 1 2
2018-03-21 1 2

View File

@ -0,0 +1,27 @@
USE test;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS replacing;
CREATE TABLE test
(
date Date,
x Int32,
ver UInt64
)
ENGINE = MergeTree(date, x, 4096);
INSERT INTO test VALUES ('2018-03-21', 1, 1), ('2018-03-21', 1, 2);
CREATE TABLE replacing ENGINE = ReplacingMergeTree(date, x, 4096, ver) AS SELECT * FROM test;
SELECT * FROM test ORDER BY ver;
SELECT * FROM replacing ORDER BY ver;
SELECT * FROM replacing FINAL ORDER BY ver;
OPTIMIZE TABLE replacing PARTITION '201803' FINAL;
SELECT * FROM replacing ORDER BY ver;
SELECT * FROM replacing FINAL ORDER BY ver;
DROP TABLE test;
DROP TABLE replacing;