Merge pull request #994 from yandex/vertical-merge-for-replacing-engines

Vertical merge for replacing engines
This commit is contained in:
alexey-milovidov 2017-07-18 02:22:03 +03:00 committed by GitHub
commit 181082ca07
5 changed files with 146 additions and 39 deletions

View File

@ -1,5 +1,6 @@
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
#include <common/logger_useful.h>
namespace DB
@ -8,6 +9,16 @@ namespace DB
void ReplacingSortedBlockInputStream::insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows)
{
if (out_row_sources_buf)
{
/// true flag value means "skip row"
current_row_sources.back().setSkipFlag(false);
out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
current_row_sources.resize(0);
}
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num);
@ -87,6 +98,10 @@ void ReplacingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, st
current_key.swap(next_key);
}
/// Initially, skip all rows. Unskip last on insert.
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
/// A non-strict comparison, since we select the last row for the same version values.
if (version >= max_version)
{

View File

@ -16,8 +16,8 @@ class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & version_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_),
version_column(version_column_)
{
}
@ -61,6 +61,8 @@ private:
UInt64 max_version = 0; /// Max version for current primary key.
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
template<class TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);

View File

@ -321,6 +321,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
key_columns.emplace(merging_params.sign_column);
/// Force version column for Replacing mode
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
key_columns.emplace(merging_params.version_column);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (auto & column : all_columns)
@ -595,7 +599,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Graphite:
@ -773,7 +777,8 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate;

View File

@ -7,17 +7,16 @@
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
@ -26,18 +25,14 @@
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
@ -46,18 +41,14 @@
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
@ -66,18 +57,14 @@
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
@ -86,18 +73,14 @@
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
@ -106,6 +89,9 @@
1 1
1 1
1
1
1
1 1
@ -117,4 +103,90 @@
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1
1
1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1
1
1

View File

@ -4,10 +4,12 @@ function create {
clickhouse-client --query="DROP TABLE IF EXISTS test.summing"
clickhouse-client --query="DROP TABLE IF EXISTS test.collapsing"
clickhouse-client --query="DROP TABLE IF EXISTS test.aggregating"
clickhouse-client --query="DROP TABLE IF EXISTS test.replacing"
clickhouse-client --query="CREATE TABLE test.summing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = SummingMergeTree(d, x, 8192)"
clickhouse-client --query="CREATE TABLE test.collapsing (d Date DEFAULT today(), x UInt64, s Int8 DEFAULT 1) ENGINE = CollapsingMergeTree(d, x, 8192, s)"
clickhouse-client --query="CREATE TABLE test.aggregating (d Date DEFAULT today(), x UInt64, s AggregateFunction(sum, UInt64)) ENGINE = AggregatingMergeTree(d, x, 8192)"
clickhouse-client --query="CREATE TABLE test.replacing (d Date DEFAULT today(), x UInt64, s Int8 DEFAULT 1, v UInt64) ENGINE = ReplacingMergeTree(d, (x), 8192, v)"
}
@ -15,6 +17,7 @@ function cleanup {
clickhouse-client --query="DROP TABLE test.summing"
clickhouse-client --query="DROP TABLE test.collapsing"
clickhouse-client --query="DROP TABLE test.aggregating"
clickhouse-client --query="DROP TABLE test.replacing"
}
@ -35,6 +38,9 @@ function test {
clickhouse-client $SETTINGS --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $1) GROUP BY number"
clickhouse-client $SETTINGS --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $2) GROUP BY number"
clickhouse-client $SETTINGS --query="INSERT INTO test.replacing (x, v) SELECT number AS x, toUInt64(number % 3 == 0) FROM system.numbers LIMIT $1"
clickhouse-client $SETTINGS --query="INSERT INTO test.replacing (x, v) SELECT number AS x, toUInt64(number % 3 == 1) FROM system.numbers LIMIT $2"
clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.summing"
clickhouse-client --query="OPTIMIZE TABLE test.summing"
clickhouse-client --query="SELECT count() = $MAX, sum(s) = $SUM FROM test.summing"
@ -47,6 +53,13 @@ function test {
clickhouse-client --query="OPTIMIZE TABLE test.aggregating"
clickhouse-client --query="SELECT count() = $MAX, sumMerge(s) = $SUM FROM test.aggregating"
echo
clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.replacing"
clickhouse-client --query="OPTIMIZE TABLE test.replacing"
clickhouse-client --query="SELECT count() = $MAX, sum(s) = $MAX FROM test.replacing"
clickhouse-client --query="SELECT count() = sum(v) FROM test.replacing where x % 3 == 0 and x < $1"
clickhouse-client --query="SELECT count() = sum(v) FROM test.replacing where x % 3 == 1 and x < $2"
clickhouse-client --query="SELECT sum(v) = 0 FROM test.replacing where x % 3 == 2"
echo
echo
}