diff --git a/dbms/src/Interpreters/PartLog.cpp b/dbms/src/Interpreters/PartLog.cpp index d77bb3fed59..b80d97ab36b 100644 --- a/dbms/src/Interpreters/PartLog.cpp +++ b/dbms/src/Interpreters/PartLog.cpp @@ -50,10 +50,13 @@ Block PartLogElement::createBlock() {ColumnUInt64::create(), std::make_shared(), "bytes_uncompressed"}, // Result bytes {ColumnUInt64::create(), std::make_shared(), "read_rows"}, {ColumnUInt64::create(), std::make_shared(), "read_bytes"}, + {ColumnUInt64::create(), std::make_shared(), "peak_memory_usage"}, /// Is there an error during the execution or commit {ColumnUInt16::create(), std::make_shared(), "error"}, {ColumnString::create(), std::make_shared(), "exception"}, + + }; } @@ -87,10 +90,12 @@ void PartLogElement::appendToBlock(Block & block) const columns[i++]->insert(bytes_uncompressed); columns[i++]->insert(rows_read); columns[i++]->insert(bytes_read_uncompressed); + columns[i++]->insert(peak_memory_usage); columns[i++]->insert(error); columns[i++]->insert(exception); + block.setColumns(std::move(columns)); } diff --git a/dbms/src/Interpreters/PartLog.h b/dbms/src/Interpreters/PartLog.h index 4c4930ccefa..b84138159a2 100644 --- a/dbms/src/Interpreters/PartLog.h +++ b/dbms/src/Interpreters/PartLog.h @@ -40,11 +40,13 @@ struct PartLogElement UInt64 bytes_uncompressed = 0; UInt64 rows_read = 0; UInt64 bytes_read_uncompressed = 0; + UInt64 peak_memory_usage = 0; /// Was the operation successful? UInt16 error = 0; String exception; + static std::string name() { return "PartLog"; } static Block createBlock(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index f7e9cb80103..4967f0ff2ae 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -3755,6 +3755,7 @@ try part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed; + part_log_elem.peak_memory_usage = (*merge_entry)->memory_tracker.getPeak(); } part_log->add(part_log_elem); diff --git a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index c44c744efaf..0b430439aae 100644 --- a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -53,6 +53,11 @@ public: } + bool hasSortingKey() const { return part->storage.hasSortingKey(); } + + Names getSortingKeyColumns() const override { return part->storage.getSortingKeyColumns(); } + + protected: StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_) : IStorage(getIDFromPart(part_), part_->storage.getVirtuals()) diff --git a/dbms/src/Storages/ReadInOrderOptimizer.cpp b/dbms/src/Storages/ReadInOrderOptimizer.cpp index 667ce095932..753ff5de7a0 100644 --- a/dbms/src/Storages/ReadInOrderOptimizer.cpp +++ b/dbms/src/Storages/ReadInOrderOptimizer.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -31,14 +32,28 @@ ReadInOrderOptimizer::ReadInOrderOptimizer( InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const { - const MergeTreeData * merge_tree = dynamic_cast(storage.get()); - if (!merge_tree || !merge_tree->hasSortingKey()) + Names sorting_key_columns; + if (const auto * merge_tree = dynamic_cast(storage.get())) + { + if (!merge_tree->hasSortingKey()) + return {}; + sorting_key_columns = merge_tree->getSortingKeyColumns(); + } + else if (const auto * part = dynamic_cast(storage.get())) + { + if (!part->hasSortingKey()) + return {}; + sorting_key_columns = part->getSortingKeyColumns(); + } + else /// Inapplicable storage type + { return {}; + } + SortDescription order_key_prefix_descr; int read_direction = required_sort_description.at(0).direction; - const auto & sorting_key_columns = merge_tree->getSortingKeyColumns(); size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size()); for (size_t i = 0; i < prefix_size; ++i) diff --git a/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.reference b/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.reference @@ -0,0 +1 @@ +1 diff --git a/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.sql b/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.sql new file mode 100644 index 00000000000..1a3e414ae26 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01200_mutations_memory_consumption.sql @@ -0,0 +1,25 @@ +DROP TABLE IF EXISTS table_with_pk; + +CREATE TABLE table_with_pk +( + key UInt8, + value String +) +ENGINE = MergeTree +ORDER BY key; + +INSERT INTO table_with_pk SELECT number, toString(number % 10) FROM numbers(10000000); + +ALTER TABLE table_with_pk DELETE WHERE key % 77 = 0 SETTINGS mutations_sync = 1; + +SYSTEM FLUSH LOGS; + +-- Memory usage for all mutations must be almost constant and less than +-- read_bytes. +SELECT + DISTINCT read_bytes >= peak_memory_usage +FROM + system.part_log2 +WHERE event_type = 'MutatePart' AND table = 'table_with_pk' AND database = currentDatabase(); + +DROP TABLE IF EXISTS table_with_pk;