diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp index 5e1d3d96153..389b78b23d4 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -23,11 +23,24 @@ ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_stre throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); children.assign(source_streams.begin(), source_streams.end()); +} - /// Trivial case - if (children.size() == 1) - return; +String ColumnGathererStream::getID() const +{ + std::stringstream res; + + res << getName() << "("; + for (size_t i = 0; i < children.size(); i++) + res << (i == 0 ? "" : ", " ) << children[i]->getID(); + res << ")"; + + return res.str(); +} + + +void ColumnGathererStream::init() +{ sources.reserve(children.size()); for (size_t i = 0; i < children.size(); i++) { @@ -52,24 +65,16 @@ ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_stre } -String ColumnGathererStream::getID() const -{ - std::stringstream res; - - res << getName() << "("; - for (size_t i = 0; i < children.size(); i++) - res << (i == 0 ? "" : ", " ) << children[i]->getID(); - res << ")"; - - return res.str(); -} - - Block ColumnGathererStream::readImpl() { - if (children.size() == 1) + /// Special case: single source and there are no skipped rows + if (children.size() == 1 && row_source.size() == 0) return children[0]->read(); + /// Initialize first source blocks + if (sources.empty()) + init(); + if (pos_global_start >= row_source.size()) return Block(); @@ -154,13 +159,10 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num) void ColumnGathererStream::readSuffixImpl() { - if (children.size() == 1) - return; - const BlockStreamProfileInfo & profile_info = getProfileInfo(); double seconds = profile_info.total_stopwatch.elapsedSeconds(); LOG_DEBUG(log, std::fixed << std::setprecision(2) - << "Gathered column " << column.name << " " << column.type->getName() + << "Gathered column " << name << " (" << static_cast(profile_info.bytes) / profile_info.rows << " bytes/elem.)" << " in " << seconds << " sec., " << profile_info.rows / seconds << " rows/sec., " diff --git a/dbms/src/DataStreams/ColumnGathererStream.h b/dbms/src/DataStreams/ColumnGathererStream.h index f253899432d..469ff197463 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.h +++ b/dbms/src/DataStreams/ColumnGathererStream.h @@ -97,6 +97,7 @@ private: } }; + void init(); void fetchNewBlock(Source & source, size_t source_num); std::vector sources; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index b4404ad2b43..522ab6ac676 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -21,6 +21,8 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams & source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); + if (out_row_sources) + out_row_sources->clear(); } String MergingSortedBlockInputStream::getID() const diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 6f366c63127..d6639ce5b54 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -681,15 +681,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE); MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written); + size_t column_elems_written = 0; column_to.writePrefix(); while ((block = column_gathered_stream.read())) { + column_elems_written += block.rows(); column_to.write(block); } column_gathered_stream.readSuffix(); checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums()); + if (rows_written != column_elems_written) + { + throw Exception("Written " + toString(column_elems_written) + " elements of column " + column_name + + ", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR); + } + if (typeid_cast(column_type.get())) offset_columns_written.emplace(offset_column_name); diff --git a/dbms/src/Storages/tests/CMakeLists.txt b/dbms/src/Storages/tests/CMakeLists.txt index 6c5dd200817..bd0897f969d 100644 --- a/dbms/src/Storages/tests/CMakeLists.txt +++ b/dbms/src/Storages/tests/CMakeLists.txt @@ -33,5 +33,3 @@ target_link_libraries (merge_selector dbms) add_executable (merge_selector2 merge_selector2.cpp) target_link_libraries (merge_selector2 dbms) - -add_executable (row_source_bitwise_compatibility row_source_bitwise_test.cpp) diff --git a/dbms/src/Storages/tests/row_source_bitwise_test.cpp b/dbms/src/Storages/tests/gtest_row_source_bits_test.cpp similarity index 51% rename from dbms/src/Storages/tests/row_source_bitwise_test.cpp rename to dbms/src/Storages/tests/gtest_row_source_bits_test.cpp index 66b281365a3..0d30d48328d 100644 --- a/dbms/src/Storages/tests/row_source_bitwise_test.cpp +++ b/dbms/src/Storages/tests/gtest_row_source_bits_test.cpp @@ -1,17 +1,13 @@ -#include +#include #include using DB::RowSourcePart; static void check(const RowSourcePart & s, size_t num, bool flag) { - if ((s.getSourceNum() != num || s.getSkipFlag() != flag) || (!flag && s.getData() != num)) - { - printf("FAIL"); - std::exit(-1); - } + EXPECT_FALSE((s.getSourceNum() != num || s.getSkipFlag() != flag) || (!flag && s.getData() != num)); } -int main(int, char **) +TEST(ColumnGathererStream, RowSourcePartBitsTest) { check(RowSourcePart(0, false), 0, false); check(RowSourcePart(0, true), 0, true); @@ -20,15 +16,12 @@ int main(int, char **) check(RowSourcePart(RowSourcePart::MAX_PARTS, false), RowSourcePart::MAX_PARTS, false); check(RowSourcePart(RowSourcePart::MAX_PARTS, true), RowSourcePart::MAX_PARTS, true); - RowSourcePart p{80, false}; - check(p, 80, false); - p.setSkipFlag(true); - check(p, 80, true); - p.setSkipFlag(false); - check(p, 80, false); - p.setSourceNum(RowSourcePart::MAX_PARTS); - check(p, RowSourcePart::MAX_PARTS, false); - - printf("PASSED"); - return 0; + RowSourcePart p{80, false}; + check(p, 80, false); + p.setSkipFlag(true); + check(p, 80, true); + p.setSkipFlag(false); + check(p, 80, false); + p.setSourceNum(RowSourcePart::MAX_PARTS); + check(p, RowSourcePart::MAX_PARTS, false); } diff --git a/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.reference b/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.reference new file mode 100644 index 00000000000..bb6e92ae8e7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.reference @@ -0,0 +1,3 @@ +1500000 1500000 1500000 1500000 1500000 1500000 +[['def']] [['','']] +0 diff --git a/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.sh b/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.sh new file mode 100755 index 00000000000..edafde929a5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00442_optimize_final_vertical_merge.sh @@ -0,0 +1,86 @@ +#!/bin/bash +set -e + +db="test" +table="optimize_me_finally" +name="$db.$table" +res_rows=1500000 # >= vertical_merge_algorithm_min_rows_to_activate + +function get_num_parts { + clickhouse-client -q "SELECT count() FROM system.parts WHERE active AND database='$db' AND table='$table'" +} + +clickhouse-client -q "DROP TABLE IF EXISTS $name" + +clickhouse-client -q "CREATE TABLE $name ( +date Date, +Sign Int8, +ki UInt64, + +ds String, +di01 UInt64, +di02 UInt64, +di03 UInt64, +di04 UInt64, +di05 UInt64, +di06 UInt64, +di07 UInt64, +di08 UInt64, +di09 UInt64, +di10 UInt64, +n Nested( +i UInt64, +s String +) +) +ENGINE = CollapsingMergeTree(date, (date, ki), 8192, Sign)" + +clickhouse-client -q "INSERT INTO $name (date, Sign, ki) SELECT +toDate(0) AS date, +toInt8(1) AS Sign, +toUInt64(0) AS ki +FROM system.numbers LIMIT 9000" + +clickhouse-client -q "INSERT INTO $name (date, Sign, ki) SELECT +toDate(0) AS date, +toInt8(1) AS Sign, +number AS ki +FROM system.numbers LIMIT 9000, 9000" + +clickhouse-client -q "INSERT INTO $name SELECT +toDate(0) AS date, +toInt8(1) AS Sign, +number AS ki, +hex(number) AS ds, +number AS di01, +number AS di02, +number AS di03, +number AS di04, +number AS di05, +number AS di06, +number AS di07, +number AS di08, +number AS di09, +number AS di10, +[number, number+1] AS \`n.i\`, +[hex(number), hex(number+1)] AS \`n.s\` +FROM system.numbers LIMIT $res_rows" + +while [[ `get_num_parts` -ne 1 ]] ; do clickhouse-client -q "OPTIMIZE TABLE $name PARTITION 197001"; done + +clickhouse-client -q "ALTER TABLE $name ADD COLUMN n.a Array(String)" +clickhouse-client -q "ALTER TABLE $name ADD COLUMN da Array(String) DEFAULT ['def']" + +clickhouse-client -q "OPTIMIZE TABLE $name PARTITION 197001 FINAL" + +clickhouse-client -q "ALTER TABLE $name MODIFY COLUMN n.a Array(String) DEFAULT ['zzz']" +clickhouse-client -q "ALTER TABLE $name MODIFY COLUMN da Array(String) DEFAULT ['zzz']" + +clickhouse-client -q "SELECT count(), sum(Sign), sum(ki = di05), sum(hex(ki) = ds), sum(ki = n.i[1]), sum([hex(ki), hex(ki+1)] = n.s) FROM $name" +clickhouse-client -q "SELECT groupUniqArray(da), groupUniqArray(n.a) FROM $name" + +hash_src=`clickhouse-client --max_threads=1 -q "SELECT cityHash64(groupArray(ki)) FROM $name"` +hash_ref=`clickhouse-client --max_threads=1 -q "SELECT cityHash64(groupArray(ki)) FROM (SELECT number as ki FROM system.numbers LIMIT $res_rows)"` +echo $(( $hash_src - $hash_ref )) + +clickhouse-client -q "DROP TABLE IF EXISTS $name"