diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index fb2d314de2b..7c6052444a1 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB @@ -8,6 +9,16 @@ namespace DB void ReplacingSortedBlockInputStream::insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows) { + if (out_row_sources_buf) + { + /// true flag value means "skip row" + current_row_sources.back().setSkipFlag(false); + + out_row_sources_buf->write(reinterpret_cast(current_row_sources.data()), + current_row_sources.size() * sizeof(RowSourcePart)); + current_row_sources.resize(0); + } + ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num); @@ -87,6 +98,10 @@ void ReplacingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, st current_key.swap(next_key); } + /// Initially, skip all rows. Unskip last on insert. + if (out_row_sources_buf) + current_row_sources.emplace_back(current.impl->order, true); + /// A non-strict comparison, since we select the last row for the same version values. if (version >= max_version) { diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index e01c0fb6cf0..a07a330a749 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -16,8 +16,8 @@ class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream { public: ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & version_column_, size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_), version_column(version_column_) { } @@ -61,6 +61,8 @@ private: UInt64 max_version = 0; /// Max version for current primary key. + PODArray current_row_sources; /// Sources of rows with the current primary key + template void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index d7c9e629797..08b2f1d1043 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -321,6 +321,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) key_columns.emplace(merging_params.sign_column); + /// Force version column for Replacing mode + if (merging_params.mode == MergeTreeData::MergingParams::Replacing) + key_columns.emplace(merging_params.version_column); + /// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns for (auto & column : all_columns) @@ -595,7 +599,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart case MergeTreeData::MergingParams::Replacing: merged_stream = std::make_unique( - src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); break; case MergeTreeData::MergingParams::Graphite: @@ -773,7 +777,8 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( bool is_supported_storage = data.merging_params.mode == MergeTreeData::MergingParams::Ordinary || - data.merging_params.mode == MergeTreeData::MergingParams::Collapsing; + data.merging_params.mode == MergeTreeData::MergingParams::Collapsing || + data.merging_params.mode == MergeTreeData::MergingParams::Replacing; bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate; diff --git a/dbms/tests/queries/0_stateless/00155_merges.reference b/dbms/tests/queries/0_stateless/00155_merges.reference index f07cdf6765b..0fb6277de58 100644 --- a/dbms/tests/queries/0_stateless/00155_merges.reference +++ b/dbms/tests/queries/0_stateless/00155_merges.reference @@ -7,17 +7,16 @@ 1 1 1 1 +1 1 +1 1 +1 +1 +1 + 1 1 1 1 -1 1 -1 1 - -1 1 -1 1 - - 1 1 1 1 @@ -26,18 +25,14 @@ 1 1 1 1 +1 +1 +1 1 1 1 1 -1 1 -1 1 - -1 1 -1 1 - - 1 1 1 1 @@ -46,18 +41,14 @@ 1 1 1 1 +1 +1 +1 1 1 1 1 -1 1 -1 1 - -1 1 -1 1 - - 1 1 1 1 @@ -66,18 +57,14 @@ 1 1 1 1 +1 +1 +1 1 1 1 1 -1 1 -1 1 - -1 1 -1 1 - - 1 1 1 1 @@ -86,18 +73,14 @@ 1 1 1 1 +1 +1 +1 1 1 1 1 -1 1 -1 1 - -1 1 -1 1 - - 1 1 1 1 @@ -106,6 +89,9 @@ 1 1 1 1 +1 +1 +1 1 1 @@ -117,4 +103,90 @@ 1 1 1 1 +1 1 +1 1 +1 +1 +1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 +1 +1 +1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 +1 +1 +1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 +1 +1 +1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 +1 +1 +1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 +1 +1 +1 + diff --git a/dbms/tests/queries/0_stateless/00155_merges.sh b/dbms/tests/queries/0_stateless/00155_merges.sh index 190f9b5704a..9e18cb8061f 100755 --- a/dbms/tests/queries/0_stateless/00155_merges.sh +++ b/dbms/tests/queries/0_stateless/00155_merges.sh @@ -4,10 +4,12 @@ function create { clickhouse-client --query="DROP TABLE IF EXISTS test.summing" clickhouse-client --query="DROP TABLE IF EXISTS test.collapsing" clickhouse-client --query="DROP TABLE IF EXISTS test.aggregating" + clickhouse-client --query="DROP TABLE IF EXISTS test.replacing" clickhouse-client --query="CREATE TABLE test.summing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = SummingMergeTree(d, x, 8192)" clickhouse-client --query="CREATE TABLE test.collapsing (d Date DEFAULT today(), x UInt64, s Int8 DEFAULT 1) ENGINE = CollapsingMergeTree(d, x, 8192, s)" clickhouse-client --query="CREATE TABLE test.aggregating (d Date DEFAULT today(), x UInt64, s AggregateFunction(sum, UInt64)) ENGINE = AggregatingMergeTree(d, x, 8192)" + clickhouse-client --query="CREATE TABLE test.replacing (d Date DEFAULT today(), x UInt64, s Int8 DEFAULT 1, v UInt64) ENGINE = ReplacingMergeTree(d, (x), 8192, v)" } @@ -15,6 +17,7 @@ function cleanup { clickhouse-client --query="DROP TABLE test.summing" clickhouse-client --query="DROP TABLE test.collapsing" clickhouse-client --query="DROP TABLE test.aggregating" + clickhouse-client --query="DROP TABLE test.replacing" } @@ -35,6 +38,9 @@ function test { clickhouse-client $SETTINGS --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $1) GROUP BY number" clickhouse-client $SETTINGS --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $2) GROUP BY number" + clickhouse-client $SETTINGS --query="INSERT INTO test.replacing (x, v) SELECT number AS x, toUInt64(number % 3 == 0) FROM system.numbers LIMIT $1" + clickhouse-client $SETTINGS --query="INSERT INTO test.replacing (x, v) SELECT number AS x, toUInt64(number % 3 == 1) FROM system.numbers LIMIT $2" + clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.summing" clickhouse-client --query="OPTIMIZE TABLE test.summing" clickhouse-client --query="SELECT count() = $MAX, sum(s) = $SUM FROM test.summing" @@ -47,6 +53,13 @@ function test { clickhouse-client --query="OPTIMIZE TABLE test.aggregating" clickhouse-client --query="SELECT count() = $MAX, sumMerge(s) = $SUM FROM test.aggregating" echo + clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.replacing" + clickhouse-client --query="OPTIMIZE TABLE test.replacing" + clickhouse-client --query="SELECT count() = $MAX, sum(s) = $MAX FROM test.replacing" + clickhouse-client --query="SELECT count() = sum(v) FROM test.replacing where x % 3 == 0 and x < $1" + clickhouse-client --query="SELECT count() = sum(v) FROM test.replacing where x % 3 == 1 and x < $2" + clickhouse-client --query="SELECT sum(v) = 0 FROM test.replacing where x % 3 == 2" + echo echo }