diff --git a/docs/en/engines/table-engines/special/memory.md b/docs/en/engines/table-engines/special/memory.md index 0d552a69804..19b5c798a76 100644 --- a/docs/en/engines/table-engines/special/memory.md +++ b/docs/en/engines/table-engines/special/memory.md @@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000). The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”). + +Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)). + +## Engine Parameters {#engine-parameters} + +- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped. + - Default value: `0` + - Requires `max_bytes_to_keep` +- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block. + - Default value: `0` +- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped. + - Default value: `0` + - Requires `max_rows_to_keep` +- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block. + - Default value: `0` + +## Usage {#usage} + + +**Initialize settings** +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000; +``` + +**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to. + +## Examples {#examples} +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; + +/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes + +/* 4. checking a very large block overrides all */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes + +SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); +``` + +``` text +┌─total_bytes─┬─total_rows─┐ +│ 65536 │ 10000 │ +└─────────────┴────────────┘ +``` + +also, for rows: + +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000; + +/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows + +/* 4. checking a very large block overrides all */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows + +SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); +``` + +``` text +┌─total_bytes─┬─total_rows─┐ +│ 65536 │ 10000 │ +└─────────────┴────────────┘ +``` diff --git a/src/Storages/MemorySettings.h b/src/Storages/MemorySettings.h index 9e1a8db3595..ac6cdf73329 100644 --- a/src/Storages/MemorySettings.h +++ b/src/Storages/MemorySettings.h @@ -10,6 +10,10 @@ class ASTStorage; #define MEMORY_SETTINGS(M, ALIAS) \ M(Bool, compress, false, "Compress data in memory", 0) \ + M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \ + M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \ DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS) diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 7a8fb9feeda..c6222d2124e 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -46,6 +46,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int CANNOT_RESTORE_TABLE; extern const int NOT_IMPLEMENTED; + extern const int SETTING_CONSTRAINT_VIOLATION; } class MemorySink : public SinkToStorage @@ -103,16 +104,37 @@ public: std::lock_guard lock(storage.mutex); auto new_data = std::make_unique(*(storage.data.get())); + UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows; + UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes; + while (!new_data->empty() + && ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep) + || (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep))) + { + Block oldest_block = new_data->front(); + UInt64 rows_to_remove = oldest_block.rows(); + UInt64 bytes_to_remove = oldest_block.allocatedBytes(); + if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep + || new_total_rows - rows_to_remove < storage.min_rows_to_keep) + { + break; // stop - removing next block will put us under min_bytes / min_rows threshold + } + + // delete old block from current storage table + new_total_rows -= rows_to_remove; + new_total_bytes -= bytes_to_remove; + new_data->erase(new_data->begin()); + } + + // append new data to modified storage table and commit new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end()); storage.data.set(std::move(new_data)); - storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed); - storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed); + storage.total_size_rows.store(new_total_rows, std::memory_order_relaxed); + storage.total_size_bytes.store(new_total_bytes, std::memory_order_relaxed); } private: Blocks new_blocks; - StorageMemory & storage; StorageSnapshotPtr storage_snapshot; }; @@ -123,8 +145,10 @@ StorageMemory::StorageMemory( ColumnsDescription columns_description_, ConstraintsDescription constraints_, const String & comment, - bool compress_) - : IStorage(table_id_), data(std::make_unique()), compress(compress_) + const MemorySettings & settings) + : IStorage(table_id_), data(std::make_unique()), compress(settings.compress), + min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep), + min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(std::move(columns_description_)); @@ -542,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory) if (has_settings) settings.loadFromQuery(*args.storage_def); - return std::make_shared(args.table_id, args.columns, args.constraints, args.comment, settings.compress); + if (settings.min_bytes_to_keep > settings.max_bytes_to_keep + || settings.min_rows_to_keep > settings.max_rows_to_keep) + throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max."); + + return std::make_shared(args.table_id, args.columns, args.constraints, args.comment, settings); }, { .supports_settings = true, diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index 3293e5e4fe5..13f1c971d82 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -30,7 +31,7 @@ public: ColumnsDescription columns_description_, ConstraintsDescription constraints_, const String & comment, - bool compress_ = false); + const MemorySettings & settings = MemorySettings()); String getName() const override { return "Memory"; } @@ -134,6 +135,11 @@ private: std::atomic total_size_rows = 0; bool compress; + UInt64 min_rows_to_keep; + UInt64 max_rows_to_keep; + UInt64 min_bytes_to_keep; + UInt64 max_bytes_to_keep; + friend class ReadFromMemoryStorageStep; }; diff --git a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference new file mode 100644 index 00000000000..20dda4fa15a --- /dev/null +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference @@ -0,0 +1,16 @@ +TESTING BYTES +8192 +9216 +9216 +65536 +TESTING ROWS +50 +1000 +1020 +1100 +TESTING NO CIRCULAR-BUFFER +8192 +9216 +17408 +82944 +TESTING INVALID SETTINGS diff --git a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql new file mode 100644 index 00000000000..fa4ba96277d --- /dev/null +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql @@ -0,0 +1,63 @@ +SET max_block_size = 65409; -- Default value + +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; + +SELECT 'TESTING BYTES'; +/* 1. testing oldest block doesn't get deleted because of min-threshold */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 4.check large block over-writes all bytes / rows */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000; + +SELECT 'TESTING ROWS'; +/* 1. add normal number of rows */ +INSERT INTO memory SELECT * FROM numbers(0, 50); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 2. table should have 1000 */ +INSERT INTO memory SELECT * FROM numbers(50, 950); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 3. table should have 1020 - removed first 50 */ +INSERT INTO memory SELECT * FROM numbers(2000, 70); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 4. check large block over-writes all rows */ +INSERT INTO memory SELECT * FROM numbers(3000, 1100); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +SELECT 'TESTING NO CIRCULAR-BUFFER'; +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory; + +INSERT INTO memory SELECT * FROM numbers(0, 1600); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(1000, 100); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(9000, 1000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(9000, 10000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +SELECT 'TESTING INVALID SETTINGS'; +CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 } +CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 } + +DROP TABLE memory; \ No newline at end of file