diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index b3dd05aee9c..3d7f43a258d 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -18,6 +18,10 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( */ addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__)); + /// If the "root" table deduplactes blocks, there are no need to make deduplication for children + /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks + bool disable_deduplication_for_children = !no_destination && storage->supportsDeduplication(); + if (!table.empty()) { Dependencies dependencies = context.getDependencies(database, table); @@ -27,7 +31,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( { views_context = std::make_unique(context); // Do not deduplicate insertions into MV if the main insertion is Ok - views_context->getSettingsRef().insert_deduplicate = false; + if (disable_deduplication_for_children) + views_context->getSettingsRef().insert_deduplicate = false; } for (const auto & database_table : dependencies) @@ -89,4 +94,51 @@ void PushingToViewsBlockOutputStream::write(const Block & block) } } +void PushingToViewsBlockOutputStream::writePrefix() +{ + if (output) + output->writePrefix(); + + for (auto & view : views) + { + try + { + view.out->writePrefix(); + } + catch (Exception & ex) + { + ex.addMessage("while write prefix to view " + view.database + "." + view.table); + throw; + } + } +} + +void PushingToViewsBlockOutputStream::writeSuffix() +{ + if (output) + output->writeSuffix(); + + for (auto & view : views) + { + try + { + view.out->writeSuffix(); + } + catch (Exception & ex) + { + ex.addMessage("while write prefix to view " + view.database + "." + view.table); + throw; + } + } +} + +void PushingToViewsBlockOutputStream::flush() +{ + if (output) + output->flush(); + + for (auto & view : views) + view.out->flush(); +} + } diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h index 4ff953fd265..2166ee4339b 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -25,32 +25,9 @@ public: Block getHeader() const override { return storage->getSampleBlock(); } void write(const Block & block) override; - void flush() override - { - if (output) - output->flush(); - - for (auto & view : views) - view.out->flush(); - } - - void writePrefix() override - { - if (output) - output->writePrefix(); - - for (auto & view : views) - view.out->writePrefix(); - } - - void writeSuffix() override - { - if (output) - output->writeSuffix(); - - for (auto & view : views) - view.out->writeSuffix(); - } + void flush() override; + void writePrefix() override; + void writeSuffix() override; private: StoragePtr storage; diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 7c3d6ba57b8..a404179baf8 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -107,6 +107,9 @@ public: /** Returns true if the storage replicates SELECT, INSERT and ALTER commands among replicas. */ virtual bool supportsReplication() const { return false; } + /** Returns true if the storage supports deduplication of inserted data blocks . */ + virtual bool supportsDeduplication() const { return false; } + /** Does not allow you to change the structure or name of the table. * If you change the data in the table, you will need to specify will_modify_data = true. * This will take an extra lock that does not allow starting ALTER MODIFY. diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 657307fd687..2777d4b9849 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -14,7 +14,7 @@ Block MergeTreeBlockOutputStream::getHeader() const void MergeTreeBlockOutputStream::write(const Block & block) { - storage.data.delayInsertIfNeeded(); + storage.data.delayInsertOrThrowIfNeeded(); auto part_blocks = storage.writer.splitBlockIntoParts(block); for (auto & current_block : part_blocks) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 7cae50805a4..4fdee23aa51 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1787,7 +1787,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const } -void MergeTreeData::delayInsertIfNeeded(Poco::Event * until) +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const { const size_t parts_count = getMaxPartsCountForPartition(); if (parts_count < settings.parts_to_delay_insert) @@ -1817,6 +1817,17 @@ void MergeTreeData::delayInsertIfNeeded(Poco::Event * until) std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(delay_milliseconds))); } +void MergeTreeData::throwInsertIfNeeded() const +{ + const size_t parts_count = getMaxPartsCountForPartition(); + + if (parts_count >= settings.parts_to_throw_insert) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception("Too many parts (" + toString(parts_count) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS); + } +} + MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 5c9044416e7..85dbf9fd760 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -389,7 +389,8 @@ public: /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertIfNeeded(Poco::Event * until = nullptr); + void delayInsertOrThrowIfNeeded(Poco::Event *until = nullptr) const; + void throwInsertIfNeeded() const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 6d5c4d9fc57..4f04b237479 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -108,7 +108,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) last_block_is_duplicate = false; /// TODO Is it possible to not lock the table structure here? - storage.data.delayInsertIfNeeded(&storage.restarting_thread->getWakeupEvent()); + storage.data.delayInsertOrThrowIfNeeded(&storage.restarting_thread->getWakeupEvent()); auto zookeeper = storage.getZooKeeper(); assertSessionIsNotExpired(zookeeper); @@ -402,5 +402,10 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo } } +void ReplicatedMergeTreeBlockOutputStream::writePrefix() +{ + storage.data.throwInsertIfNeeded(); +} + } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 29ca8657038..f6ad819c4fb 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -26,6 +26,7 @@ public: bool deduplicate_); Block getHeader() const override; + void writePrefix() override; void write(const Block & block) override; /// For ATTACHing existing data on filesystem. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 98af0ff1d78..b16e1c27f27 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -87,6 +87,7 @@ public: bool supportsFinal() const override { return data.supportsFinal(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } bool supportsReplication() const override { return true; } + bool supportsDeduplication() const override { return true; } const ColumnsDescription & getColumns() const override { return data.getColumns(); } void setColumns(ColumnsDescription columns_) override { return data.setColumns(std::move(columns_)); } diff --git a/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.reference b/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.reference new file mode 100644 index 00000000000..6e0517557ad --- /dev/null +++ b/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.reference @@ -0,0 +1,9 @@ +a 1 +b 1 +c 1 + +a 1 +b 1 +c 1 + +1 diff --git a/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.sh b/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.sh new file mode 100755 index 00000000000..115828a2331 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00633_materialized_view_and_too_many_parts_zookeeper.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.b" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.c" + +${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.root (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/root', '1') ORDER BY d" +${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.a (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/a', '1') ORDER BY d AS SELECT * FROM test.root" +${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.b (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/b', '1') ORDER BY d SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=1 AS SELECT * FROM test.root" +${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.c (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/c', '1') ORDER BY d AS SELECT * FROM test.root" + +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)"; +${CLICKHOUSE_CLIENT} --query "SELECT _table, d FROM merge('test', '^[abc]\$') ORDER BY _table" +if ${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (2)" 2>/dev/null; then + echo "FAIL\nExpected 'too many parts' on table test.b" +fi + +echo +${CLICKHOUSE_CLIENT} --query "SELECT _table, d FROM merge('test', '^[abc]\$') ORDER BY _table" + +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.b" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.c" + +# Deduplication check for non-replicated root table +echo +${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.root (d UInt64) ENGINE = Null" +${CLICKHOUSE_CLIENT} --query "CREATE MATERIALIZED VIEW test.a (d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/test/a', '1') ORDER BY d AS SELECT * FROM test.root" +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)"; +${CLICKHOUSE_CLIENT} --query "INSERT INTO test.root VALUES (1)"; +${CLICKHOUSE_CLIENT} --query "SELECT * FROM test.a"; +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.root" +${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.a"