Merge pull request #60612 from JakeBamrah/master

[59558] Add size cap to Memory tables
This commit is contained in:
Yarik Briukhovetskyi 2024-03-18 13:21:56 +01:00 committed by GitHub
commit c85d410cda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 200 additions and 7 deletions

View File

@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”).
Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)).
## Engine Parameters {#engine-parameters}
- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped.
- Default value: `0`
- Requires `max_bytes_to_keep`
- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block.
- Default value: `0`
- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped.
- Default value: `0`
- Requires `max_rows_to_keep`
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
- Default value: `0`
## Usage {#usage}
**Initialize settings**
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
```
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
## Examples {#examples}
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes
/* 4. checking a very large block overrides all */
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
```
``` text
┌─total_bytes─┬─total_rows─┐
│ 65536 │ 10000 │
└─────────────┴────────────┘
```
also, for rows:
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000;
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows
/* 4. checking a very large block overrides all */
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
```
``` text
┌─total_bytes─┬─total_rows─┐
│ 65536 │ 10000 │
└─────────────┴────────────┘
```

View File

@ -10,6 +10,10 @@ class ASTStorage;
#define MEMORY_SETTINGS(M, ALIAS) \
M(Bool, compress, false, "Compress data in memory", 0) \
M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \
M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \
DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)

View File

@ -46,6 +46,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int SETTING_CONSTRAINT_VIOLATION;
}
class MemorySink : public SinkToStorage
@ -103,16 +104,37 @@ public:
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
while (!new_data->empty()
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
{
Block oldest_block = new_data->front();
UInt64 rows_to_remove = oldest_block.rows();
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
{
break; // stop - removing next block will put us under min_bytes / min_rows threshold
}
// delete old block from current storage table
new_total_rows -= rows_to_remove;
new_total_bytes -= bytes_to_remove;
new_data->erase(new_data->begin());
}
// append new data to modified storage table and commit
new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end());
storage.data.set(std::move(new_data));
storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed);
storage.total_size_rows.store(new_total_rows, std::memory_order_relaxed);
storage.total_size_bytes.store(new_total_bytes, std::memory_order_relaxed);
}
private:
Blocks new_blocks;
StorageMemory & storage;
StorageSnapshotPtr storage_snapshot;
};
@ -123,8 +145,10 @@ StorageMemory::StorageMemory(
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
bool compress_)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(compress_)
const MemorySettings & settings)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
@ -542,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory)
if (has_settings)
settings.loadFromQuery(*args.storage_def);
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings.compress);
if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
},
{
.supports_settings = true,

View File

@ -7,6 +7,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Storages/IStorage.h>
#include <Storages/MemorySettings.h>
#include <Common/MultiVersion.h>
@ -30,7 +31,7 @@ public:
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
bool compress_ = false);
const MemorySettings & settings = MemorySettings());
String getName() const override { return "Memory"; }
@ -134,6 +135,11 @@ private:
std::atomic<size_t> total_size_rows = 0;
bool compress;
UInt64 min_rows_to_keep;
UInt64 max_rows_to_keep;
UInt64 min_bytes_to_keep;
UInt64 max_bytes_to_keep;
friend class ReadFromMemoryStorageStep;
};

View File

@ -0,0 +1,16 @@
TESTING BYTES
8192
9216
9216
65536
TESTING ROWS
50
1000
1020
1100
TESTING NO CIRCULAR-BUFFER
8192
9216
17408
82944
TESTING INVALID SETTINGS

View File

@ -0,0 +1,63 @@
SET max_block_size = 65409; -- Default value
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
SELECT 'TESTING BYTES';
/* 1. testing oldest block doesn't get deleted because of min-threshold */
INSERT INTO memory SELECT * FROM numbers(0, 1600);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 4.check large block over-writes all bytes / rows */
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
SELECT 'TESTING ROWS';
/* 1. add normal number of rows */
INSERT INTO memory SELECT * FROM numbers(0, 50);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 2. table should have 1000 */
INSERT INTO memory SELECT * FROM numbers(50, 950);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 3. table should have 1020 - removed first 50 */
INSERT INTO memory SELECT * FROM numbers(2000, 70);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 4. check large block over-writes all rows */
INSERT INTO memory SELECT * FROM numbers(3000, 1100);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
SELECT 'TESTING NO CIRCULAR-BUFFER';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
INSERT INTO memory SELECT * FROM numbers(0, 1600);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(1000, 100);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
SELECT 'TESTING INVALID SETTINGS';
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 }
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 }
DROP TABLE memory;