Fix mutations huge memory consumption

This commit is contained in:
alesapin 2020-03-19 14:31:21 +03:00
parent a764545120
commit d27cd773cc
7 changed files with 57 additions and 3 deletions

View File

@ -50,10 +50,13 @@ Block PartLogElement::createBlock()
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "bytes_uncompressed"}, // Result bytes {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "bytes_uncompressed"}, // Result bytes
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_rows"}, {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_rows"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_bytes"}, {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_bytes"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "peak_memory_usage"},
/// Is there an error during the execution or commit /// Is there an error during the execution or commit
{ColumnUInt16::create(), std::make_shared<DataTypeUInt16>(), "error"}, {ColumnUInt16::create(), std::make_shared<DataTypeUInt16>(), "error"},
{ColumnString::create(), std::make_shared<DataTypeString>(), "exception"}, {ColumnString::create(), std::make_shared<DataTypeString>(), "exception"},
}; };
} }
@ -87,10 +90,12 @@ void PartLogElement::appendToBlock(Block & block) const
columns[i++]->insert(bytes_uncompressed); columns[i++]->insert(bytes_uncompressed);
columns[i++]->insert(rows_read); columns[i++]->insert(rows_read);
columns[i++]->insert(bytes_read_uncompressed); columns[i++]->insert(bytes_read_uncompressed);
columns[i++]->insert(peak_memory_usage);
columns[i++]->insert(error); columns[i++]->insert(error);
columns[i++]->insert(exception); columns[i++]->insert(exception);
block.setColumns(std::move(columns)); block.setColumns(std::move(columns));
} }

View File

@ -40,11 +40,13 @@ struct PartLogElement
UInt64 bytes_uncompressed = 0; UInt64 bytes_uncompressed = 0;
UInt64 rows_read = 0; UInt64 rows_read = 0;
UInt64 bytes_read_uncompressed = 0; UInt64 bytes_read_uncompressed = 0;
UInt64 peak_memory_usage = 0;
/// Was the operation successful? /// Was the operation successful?
UInt16 error = 0; UInt16 error = 0;
String exception; String exception;
static std::string name() { return "PartLog"; } static std::string name() { return "PartLog"; }
static Block createBlock(); static Block createBlock();

View File

@ -3755,6 +3755,7 @@ try
part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.rows = (*merge_entry)->rows_written;
part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed; part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
part_log_elem.peak_memory_usage = (*merge_entry)->memory_tracker.getPeak();
} }
part_log->add(part_log_elem); part_log->add(part_log_elem);

View File

@ -53,6 +53,11 @@ public:
} }
bool hasSortingKey() const { return part->storage.hasSortingKey(); }
Names getSortingKeyColumns() const override { return part->storage.getSortingKeyColumns(); }
protected: protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_) StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_), part_->storage.getVirtuals()) : IStorage(getIDFromPart(part_), part_->storage.getVirtuals())

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/AnalyzedJoin.h> #include <Interpreters/AnalyzedJoin.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
namespace DB namespace DB
{ {
@ -31,14 +32,28 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
{ {
const MergeTreeData * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()); Names sorting_key_columns;
if (!merge_tree || !merge_tree->hasSortingKey()) if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
{
if (!merge_tree->hasSortingKey())
return {};
sorting_key_columns = merge_tree->getSortingKeyColumns();
}
else if (const auto * part = dynamic_cast<const StorageFromMergeTreeDataPart *>(storage.get()))
{
if (!part->hasSortingKey())
return {};
sorting_key_columns = part->getSortingKeyColumns();
}
else /// Inapplicable storage type
{
return {}; return {};
}
SortDescription order_key_prefix_descr; SortDescription order_key_prefix_descr;
int read_direction = required_sort_description.at(0).direction; int read_direction = required_sort_description.at(0).direction;
const auto & sorting_key_columns = merge_tree->getSortingKeyColumns();
size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size()); size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i) for (size_t i = 0; i < prefix_size; ++i)

View File

@ -0,0 +1,25 @@
DROP TABLE IF EXISTS table_with_pk;
CREATE TABLE table_with_pk
(
key UInt8,
value String
)
ENGINE = MergeTree
ORDER BY key;
INSERT INTO table_with_pk SELECT number, toString(number % 10) FROM numbers(10000000);
ALTER TABLE table_with_pk DELETE WHERE key % 77 = 0 SETTINGS mutations_sync = 1;
SYSTEM FLUSH LOGS;
-- Memory usage for all mutations must be almost constant and less than
-- read_bytes.
SELECT
DISTINCT read_bytes >= peak_memory_usage
FROM
system.part_log2
WHERE event_type = 'MutatePart' AND table = 'table_with_pk' AND database = currentDatabase();
DROP TABLE IF EXISTS table_with_pk;