From c73e299820ffef1142c3b6cc9ef89b7dc69e3525 Mon Sep 17 00:00:00 2001 From: Jake Bamrah Date: Thu, 29 Feb 2024 23:42:23 +0100 Subject: [PATCH 01/11] add size cap to memory tables --- src/Core/Settings.h | 4 ++ src/Core/SettingsChangesHistory.h | 4 ++ src/Storages/StorageMemory.cpp | 40 ++++++++++++++-- ...storage_memory_circ_buffer_usage.reference | 8 ++++ ...03001_storage_memory_circ_buffer_usage.sql | 47 +++++++++++++++++++ 5 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference create mode 100644 tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d70a6cf51c5..500157081a3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -885,6 +885,10 @@ class IColumn; M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.", 0) \ M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \ M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ + M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \ + M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index e680c02671a..436c6963fb8 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -93,6 +93,10 @@ static std::map sett {"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"}, {"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."}, {"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"}, + {"max_rows_to_keep", 0, 0, "Introducing new feature for memory tables."}, + {"min_rows_to_keep", 0, 0, "Introducing new feature for memory tables."}, + {"max_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."}, + {"min_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."}, }}, {"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"}, {"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"}, diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 7a8fb9feeda..518b9f5b212 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -46,6 +46,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int CANNOT_RESTORE_TABLE; extern const int NOT_IMPLEMENTED; + extern const int SETTING_CONSTRAINT_VIOLATION; } class MemorySink : public SinkToStorage @@ -54,10 +55,11 @@ public: MemorySink( StorageMemory & storage_, const StorageMetadataPtr & metadata_snapshot_, - ContextPtr context) + ContextPtr context_) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , storage(storage_) - , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context)) + , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context_)) + , context(context_) { } @@ -100,21 +102,49 @@ public: inserted_rows += block.rows(); } + Settings settings = context->getSettings(); + if ((settings.min_bytes_to_keep && settings.min_bytes_to_keep > settings.max_bytes_to_keep) + || (settings.min_rows_to_keep && settings.min_rows_to_keep > settings.max_rows_to_keep)) { + throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max."); + } + std::lock_guard lock(storage.mutex); auto new_data = std::make_unique(*(storage.data.get())); + UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows; + UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes; + while (!new_data->empty() + && ((settings.max_bytes_to_keep && new_total_bytes > settings.max_bytes_to_keep) + || (settings.max_rows_to_keep && new_total_rows > settings.max_rows_to_keep))) + { + Block oldest_block = new_data->front(); + UInt64 rows_to_remove = oldest_block.rows(); + UInt64 bytes_to_remove = oldest_block.allocatedBytes(); + if (new_total_bytes - bytes_to_remove < settings.min_bytes_to_keep + || new_total_rows - rows_to_remove < settings.min_rows_to_keep) + { + break; // stop - removing next block will put us under min_bytes / min_rows threshold + } + + // delete old block from current storage table + new_total_rows -= rows_to_remove; + new_total_bytes -= bytes_to_remove; + new_data->erase(new_data->begin()); + } + + // finally - append new data to modified storage table and commit new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end()); storage.data.set(std::move(new_data)); - storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed); - storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed); + storage.total_size_rows.store(new_total_rows, std::memory_order_relaxed); + storage.total_size_bytes.store(new_total_bytes, std::memory_order_relaxed); } private: Blocks new_blocks; - StorageMemory & storage; StorageSnapshotPtr storage_snapshot; + ContextPtr context; }; diff --git a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference b/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference new file mode 100644 index 00000000000..e9e710a535b --- /dev/null +++ b/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference @@ -0,0 +1,8 @@ +8192 +9216 +9216 +65536 +50 +1000 +1020 +1100 diff --git a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql b/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql new file mode 100644 index 00000000000..13ddc428415 --- /dev/null +++ b/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql @@ -0,0 +1,47 @@ +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory; +SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; + +/* TESTING BYTES */ +/* 1. testing oldest block doesn't get deleted because of min-threshold */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 4.check large block over-writes all bytes / rows */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + + +truncate memory; +SET min_rows_to_keep = 100, max_rows_to_keep = 1000; + +/* TESTING ROWS */ +/* 1. add normal number of rows */ +INSERT INTO memory SELECT * FROM numbers(0, 50); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 2. table should have 1000 */ +INSERT INTO memory SELECT * FROM numbers(50, 950); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 3. table should have 1020 - removed first 50 */ +INSERT INTO memory SELECT * FROM numbers(2000, 70); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* 4. check large block over-writes all rows */ +INSERT INTO memory SELECT * FROM numbers(3000, 1100); +SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* test invalid settings */ +SET min_bytes_to_keep = 4096, max_bytes_to_keep = 0; +INSERT INTO memory SELECT * FROM numbers(3000, 1100); -- { serverError 452 } + +DROP TABLE memory; \ No newline at end of file From 000719aa2ab71947b7ad67e92c10e1a8663e9e2c Mon Sep 17 00:00:00 2001 From: Jake Bamrah <45361366+JakeBamrah@users.noreply.github.com> Date: Thu, 29 Feb 2024 23:54:55 +0100 Subject: [PATCH 02/11] update memory table docs with size cap changes --- .../engines/table-engines/special/memory.md | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/docs/en/engines/table-engines/special/memory.md b/docs/en/engines/table-engines/special/memory.md index 0d552a69804..d6bcce68a0e 100644 --- a/docs/en/engines/table-engines/special/memory.md +++ b/docs/en/engines/table-engines/special/memory.md @@ -21,3 +21,56 @@ When restarting a server, data disappears from the table and the table becomes e Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000). The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”). + +## Engine Parameters + +- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped. + - Default value: `0` + - Requires `max_bytes_to_keep` +- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block. + - Default value: `0` +- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped. + - Default value: `0` + - Requires `max_rows_to_keep` +- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block. + - Default value: `0` + +## Usage {#usage} + +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory; +``` + +## Examples {#examples} + +**Example 1: Setting parameters** +``` sql +SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; +``` + +**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to. + +**Example 2: Basic usage** +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory; + +SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; + +/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); + +/* 4. checking a very large block overrides all */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); +``` + +``` text +┌─total_bytes─┬─total_rows─┐ +│ 65536 │ 10000 │ +└─────────────┴────────────┘ +``` From 77223d682d9d4ba7dcb7d3c36d323c43cd224bdd Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Fri, 1 Mar 2024 02:01:40 +0100 Subject: [PATCH 03/11] reload tests --- src/Storages/StorageMemory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 518b9f5b212..1a7a76ee285 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -132,7 +132,7 @@ public: new_data->erase(new_data->begin()); } - // finally - append new data to modified storage table and commit + // append new data to modified storage table and commit new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end()); storage.data.set(std::move(new_data)); From 4aab1170ef6c167df867a0565968744f9eb79852 Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:34:53 +0100 Subject: [PATCH 04/11] fix style --- src/Storages/StorageMemory.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 1a7a76ee285..6afe3331054 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -104,9 +104,8 @@ public: Settings settings = context->getSettings(); if ((settings.min_bytes_to_keep && settings.min_bytes_to_keep > settings.max_bytes_to_keep) - || (settings.min_rows_to_keep && settings.min_rows_to_keep > settings.max_rows_to_keep)) { + || (settings.min_rows_to_keep && settings.min_rows_to_keep > settings.max_rows_to_keep)) throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max."); - } std::lock_guard lock(storage.mutex); From e53d052dccf6c2a1e913224b5d4d3a00a86be921 Mon Sep 17 00:00:00 2001 From: Jake Bamrah Date: Sun, 10 Mar 2024 22:38:05 +0100 Subject: [PATCH 05/11] update docs to include circular buffer behaviour --- docs/en/engines/table-engines/special/memory.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/en/engines/table-engines/special/memory.md b/docs/en/engines/table-engines/special/memory.md index d6bcce68a0e..e008ed8619d 100644 --- a/docs/en/engines/table-engines/special/memory.md +++ b/docs/en/engines/table-engines/special/memory.md @@ -22,7 +22,10 @@ Normally, using this table engine is not justified. However, it can be used for The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”). -## Engine Parameters +Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular +buffer (see [Engine Parameters](#engine-parameters)). + +## Engine Parameters {#engine-parameters} - `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped. - Default value: `0` From a9e054e1c8c1b6435cb3f152eb6e31d3086b04b4 Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:17:33 +0100 Subject: [PATCH 06/11] Reload CI From 5e66cd7154e36cc9ffbbfaf123c3aeb81d2a14de Mon Sep 17 00:00:00 2001 From: Jake Bamrah Date: Tue, 12 Mar 2024 23:41:00 +0100 Subject: [PATCH 07/11] fix settings seg-fault and expand test case coverage --- .../engines/table-engines/special/memory.md | 14 ++++----- src/Core/Settings.h | 4 --- src/Core/SettingsChangesHistory.h | 4 --- src/Storages/MemorySettings.h | 4 +++ src/Storages/StorageMemory.cpp | 31 +++++++++---------- src/Storages/StorageMemory.h | 8 ++++- ...torage_memory_circ_buffer_usage.reference} | 4 +++ ...3009_storage_memory_circ_buffer_usage.sql} | 30 +++++++++++++----- 8 files changed, 58 insertions(+), 41 deletions(-) rename tests/queries/0_stateless/{03001_storage_memory_circ_buffer_usage.reference => 03009_storage_memory_circ_buffer_usage.reference} (63%) rename tests/queries/0_stateless/{03001_storage_memory_circ_buffer_usage.sql => 03009_storage_memory_circ_buffer_usage.sql} (59%) diff --git a/docs/en/engines/table-engines/special/memory.md b/docs/en/engines/table-engines/special/memory.md index e008ed8619d..8ebdd57ae67 100644 --- a/docs/en/engines/table-engines/special/memory.md +++ b/docs/en/engines/table-engines/special/memory.md @@ -44,20 +44,16 @@ buffer (see [Engine Parameters](#engine-parameters)). CREATE TABLE memory (i UInt32) ENGINE = Memory; ``` -## Examples {#examples} - -**Example 1: Setting parameters** +**Initialize settings** ``` sql -SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000; ``` **Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to. -**Example 2: Basic usage** +## Examples {#examples} ``` sql -CREATE TABLE memory (i UInt32) ENGINE = Memory; - -SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; /* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ INSERT INTO memory SELECT * FROM numbers(0, 1600); @@ -70,6 +66,8 @@ INSERT INTO memory SELECT * FROM numbers(9000, 1000); /* 4. checking a very large block overrides all */ INSERT INTO memory SELECT * FROM numbers(9000, 10000); + +SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); ``` ``` text diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 500157081a3..d70a6cf51c5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -885,10 +885,6 @@ class IColumn; M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.", 0) \ M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \ M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ - M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \ - M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \ - M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \ - M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 436c6963fb8..e680c02671a 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -93,10 +93,6 @@ static std::map sett {"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"}, {"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."}, {"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"}, - {"max_rows_to_keep", 0, 0, "Introducing new feature for memory tables."}, - {"min_rows_to_keep", 0, 0, "Introducing new feature for memory tables."}, - {"max_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."}, - {"min_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."}, }}, {"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"}, {"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"}, diff --git a/src/Storages/MemorySettings.h b/src/Storages/MemorySettings.h index 9e1a8db3595..ac6cdf73329 100644 --- a/src/Storages/MemorySettings.h +++ b/src/Storages/MemorySettings.h @@ -10,6 +10,10 @@ class ASTStorage; #define MEMORY_SETTINGS(M, ALIAS) \ M(Bool, compress, false, "Compress data in memory", 0) \ + M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \ + M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \ + M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \ DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS) diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 6afe3331054..c6222d2124e 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -55,11 +55,10 @@ public: MemorySink( StorageMemory & storage_, const StorageMetadataPtr & metadata_snapshot_, - ContextPtr context_) + ContextPtr context) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , storage(storage_) - , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context_)) - , context(context_) + , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context)) { } @@ -102,25 +101,20 @@ public: inserted_rows += block.rows(); } - Settings settings = context->getSettings(); - if ((settings.min_bytes_to_keep && settings.min_bytes_to_keep > settings.max_bytes_to_keep) - || (settings.min_rows_to_keep && settings.min_rows_to_keep > settings.max_rows_to_keep)) - throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max."); - std::lock_guard lock(storage.mutex); auto new_data = std::make_unique(*(storage.data.get())); UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows; UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes; while (!new_data->empty() - && ((settings.max_bytes_to_keep && new_total_bytes > settings.max_bytes_to_keep) - || (settings.max_rows_to_keep && new_total_rows > settings.max_rows_to_keep))) + && ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep) + || (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep))) { Block oldest_block = new_data->front(); UInt64 rows_to_remove = oldest_block.rows(); UInt64 bytes_to_remove = oldest_block.allocatedBytes(); - if (new_total_bytes - bytes_to_remove < settings.min_bytes_to_keep - || new_total_rows - rows_to_remove < settings.min_rows_to_keep) + if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep + || new_total_rows - rows_to_remove < storage.min_rows_to_keep) { break; // stop - removing next block will put us under min_bytes / min_rows threshold } @@ -143,7 +137,6 @@ private: Blocks new_blocks; StorageMemory & storage; StorageSnapshotPtr storage_snapshot; - ContextPtr context; }; @@ -152,8 +145,10 @@ StorageMemory::StorageMemory( ColumnsDescription columns_description_, ConstraintsDescription constraints_, const String & comment, - bool compress_) - : IStorage(table_id_), data(std::make_unique()), compress(compress_) + const MemorySettings & settings) + : IStorage(table_id_), data(std::make_unique()), compress(settings.compress), + min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep), + min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(std::move(columns_description_)); @@ -571,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory) if (has_settings) settings.loadFromQuery(*args.storage_def); - return std::make_shared(args.table_id, args.columns, args.constraints, args.comment, settings.compress); + if (settings.min_bytes_to_keep > settings.max_bytes_to_keep + || settings.min_rows_to_keep > settings.max_rows_to_keep) + throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max."); + + return std::make_shared(args.table_id, args.columns, args.constraints, args.comment, settings); }, { .supports_settings = true, diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index 3293e5e4fe5..13f1c971d82 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -30,7 +31,7 @@ public: ColumnsDescription columns_description_, ConstraintsDescription constraints_, const String & comment, - bool compress_ = false); + const MemorySettings & settings = MemorySettings()); String getName() const override { return "Memory"; } @@ -134,6 +135,11 @@ private: std::atomic total_size_rows = 0; bool compress; + UInt64 min_rows_to_keep; + UInt64 max_rows_to_keep; + UInt64 min_bytes_to_keep; + UInt64 max_bytes_to_keep; + friend class ReadFromMemoryStorageStep; }; diff --git a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference similarity index 63% rename from tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference rename to tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference index e9e710a535b..0bcc6fe4434 100644 --- a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.reference +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference @@ -6,3 +6,7 @@ 1000 1020 1100 +8192 +9216 +17408 +82944 diff --git a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql similarity index 59% rename from tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql rename to tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql index 13ddc428415..77aa1d0befc 100644 --- a/tests/queries/0_stateless/03001_storage_memory_circ_buffer_usage.sql +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql @@ -1,6 +1,5 @@ DROP TABLE IF EXISTS memory; -CREATE TABLE memory (i UInt32) ENGINE = Memory; -SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; /* TESTING BYTES */ /* 1. testing oldest block doesn't get deleted because of min-threshold */ @@ -19,9 +18,8 @@ SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = curre INSERT INTO memory SELECT * FROM numbers(9000, 10000); SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); - -truncate memory; -SET min_rows_to_keep = 100, max_rows_to_keep = 1000; +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000; /* TESTING ROWS */ /* 1. add normal number of rows */ @@ -40,8 +38,24 @@ SELECT total_rows FROM system.tables WHERE name = 'memory' and database = curren INSERT INTO memory SELECT * FROM numbers(3000, 1100); SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -/* test invalid settings */ -SET min_bytes_to_keep = 4096, max_bytes_to_keep = 0; -INSERT INTO memory SELECT * FROM numbers(3000, 1100); -- { serverError 452 } +/* TESTING NO CIRCULAR-BUFFER */ +DROP TABLE IF EXISTS memory; +CREATE TABLE memory (i UInt32) ENGINE = Memory; + +INSERT INTO memory SELECT * FROM numbers(0, 1600); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(1000, 100); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(9000, 1000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +INSERT INTO memory SELECT * FROM numbers(9000, 10000); +SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); + +/* TESTING INVALID SETTINGS */ +CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 } +CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 } DROP TABLE memory; \ No newline at end of file From a915dace2752947e8d8d4e256342f59f8a287fff Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Wed, 13 Mar 2024 16:19:44 +0100 Subject: [PATCH 08/11] reload CI From 5dad208b56d22a8f02fcfb72fcd822d87a8ca941 Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Fri, 15 Mar 2024 13:40:18 +0100 Subject: [PATCH 09/11] fix tests --- .../03009_storage_memory_circ_buffer_usage.reference | 4 ++++ .../03009_storage_memory_circ_buffer_usage.sql | 10 ++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference index 0bcc6fe4434..20dda4fa15a 100644 --- a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.reference @@ -1,12 +1,16 @@ +TESTING BYTES 8192 9216 9216 65536 +TESTING ROWS 50 1000 1020 1100 +TESTING NO CIRCULAR-BUFFER 8192 9216 17408 82944 +TESTING INVALID SETTINGS diff --git a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql index 77aa1d0befc..fa4ba96277d 100644 --- a/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql +++ b/tests/queries/0_stateless/03009_storage_memory_circ_buffer_usage.sql @@ -1,7 +1,9 @@ +SET max_block_size = 65409; -- Default value + DROP TABLE IF EXISTS memory; CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; -/* TESTING BYTES */ +SELECT 'TESTING BYTES'; /* 1. testing oldest block doesn't get deleted because of min-threshold */ INSERT INTO memory SELECT * FROM numbers(0, 1600); SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); @@ -21,7 +23,7 @@ SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = curre DROP TABLE IF EXISTS memory; CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000; -/* TESTING ROWS */ +SELECT 'TESTING ROWS'; /* 1. add normal number of rows */ INSERT INTO memory SELECT * FROM numbers(0, 50); SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); @@ -38,7 +40,7 @@ SELECT total_rows FROM system.tables WHERE name = 'memory' and database = curren INSERT INTO memory SELECT * FROM numbers(3000, 1100); SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -/* TESTING NO CIRCULAR-BUFFER */ +SELECT 'TESTING NO CIRCULAR-BUFFER'; DROP TABLE IF EXISTS memory; CREATE TABLE memory (i UInt32) ENGINE = Memory; @@ -54,7 +56,7 @@ SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = curre INSERT INTO memory SELECT * FROM numbers(9000, 10000); SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -/* TESTING INVALID SETTINGS */ +SELECT 'TESTING INVALID SETTINGS'; CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 } CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 } From 4d6aeaa151a7789fdcc8c1d250d1ed780fbd89c4 Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Fri, 15 Mar 2024 15:31:58 +0100 Subject: [PATCH 10/11] Reload CI From f67eae6d7b8f3ecbe2becb7aa43a8d83431142f8 Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Fri, 15 Mar 2024 15:50:17 +0100 Subject: [PATCH 11/11] Update memory.md --- .../engines/table-engines/special/memory.md | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/docs/en/engines/table-engines/special/memory.md b/docs/en/engines/table-engines/special/memory.md index 8ebdd57ae67..19b5c798a76 100644 --- a/docs/en/engines/table-engines/special/memory.md +++ b/docs/en/engines/table-engines/special/memory.md @@ -22,8 +22,7 @@ Normally, using this table engine is not justified. However, it can be used for The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”). -Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular -buffer (see [Engine Parameters](#engine-parameters)). +Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)). ## Engine Parameters {#engine-parameters} @@ -40,9 +39,6 @@ buffer (see [Engine Parameters](#engine-parameters)). ## Usage {#usage} -``` sql -CREATE TABLE memory (i UInt32) ENGINE = Memory; -``` **Initialize settings** ``` sql @@ -56,16 +52,42 @@ CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384; /* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ -INSERT INTO memory SELECT * FROM numbers(0, 1600); +INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes /* 2. adding block that doesn't get deleted */ -INSERT INTO memory SELECT * FROM numbers(1000, 100); +INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes /* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ -INSERT INTO memory SELECT * FROM numbers(9000, 1000); +INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes /* 4. checking a very large block overrides all */ -INSERT INTO memory SELECT * FROM numbers(9000, 10000); +INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes + +SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); +``` + +``` text +┌─total_bytes─┬─total_rows─┐ +│ 65536 │ 10000 │ +└─────────────┴────────────┘ +``` + +also, for rows: + +``` sql +CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000; + +/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */ +INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows + +/* 2. adding block that doesn't get deleted */ +INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows + +/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */ +INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows + +/* 4. checking a very large block overrides all */ +INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); ```