Support ALTER MODIFY SETTING for Memory tables

This commit is contained in:
zhongyuankai 2024-03-27 12:55:04 +08:00
parent 2a820a14c5
commit f1ae99b113
7 changed files with 151 additions and 23 deletions

View File

@ -37,6 +37,11 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
- Default value: `0`
**Modify settings**
```sql
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;
```
## Usage {#usage}

View File

@ -1,6 +1,5 @@
#include <Storages/MemorySettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
@ -11,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int SETTING_CONSTRAINT_VIOLATION;
}
IMPLEMENT_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
@ -32,5 +32,22 @@ void MemorySettings::loadFromQuery(ASTStorage & storage_def)
}
}
ASTPtr MemorySettings::getSettingsChangesQuery()
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
for (const auto & change : changes())
settings_ast->changes.push_back(change);
return settings_ast;
}
void MemorySettings::sanityCheck() const
{
if (min_bytes_to_keep > max_bytes_to_keep
|| min_rows_to_keep > max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
@ -24,6 +25,8 @@ DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
struct MemorySettings : public BaseSettings<memorySettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
ASTPtr getSettingsChangesQuery();
void sanityCheck() const;
};
}

View File

@ -76,7 +76,7 @@ public:
convertDynamicColumnsToTuples(block, storage_snapshot);
}
if (storage.compress)
if (storage.getMemorySettingsRef().compress)
{
Block compressed_block;
for (const auto & elem : block)
@ -106,15 +106,16 @@ public:
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
auto & memory_settings = storage.getMemorySettingsRef();
while (!new_data->empty()
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
&& ((memory_settings.max_bytes_to_keep && new_total_bytes > memory_settings.max_bytes_to_keep)
|| (memory_settings.max_rows_to_keep && new_total_rows > memory_settings.max_rows_to_keep)))
{
Block oldest_block = new_data->front();
UInt64 rows_to_remove = oldest_block.rows();
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
if (new_total_bytes - bytes_to_remove < memory_settings.min_bytes_to_keep
|| new_total_rows - rows_to_remove < memory_settings.min_rows_to_keep)
{
break; // stop - removing next block will put us under min_bytes / min_rows threshold
}
@ -145,15 +146,16 @@ StorageMemory::StorageMemory(
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
const MemorySettings & settings)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
const MemorySettings & memory_settings_)
: IStorage(table_id_)
, data(std::make_unique<const Blocks>())
, memory_settings(memory_settings_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
storage_metadata.setConstraints(std::move(constraints_));
storage_metadata.setComment(comment);
storage_metadata.setSettingsChanges(memory_settings.getSettingsChangesQuery());
setInMemoryMetadata(storage_metadata);
}
@ -239,7 +241,7 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
Block block;
while (executor.pull(block))
{
if (compress)
if (memory_settings.compress)
for (auto & elem : block)
elem.column = elem.column->compress();
@ -294,6 +296,25 @@ void StorageMemory::truncate(
total_size_rows.store(0, std::memory_order_relaxed);
}
void StorageMemory::alter(const DB::AlterCommands & params, DB::ContextPtr context, DB::IStorage::AlterLockHolder & /*alter_lock_holder*/)
{
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
params.apply(new_metadata, context);
if (params.isSettingsAlter())
{
auto & settings_changes = new_metadata.settings_changes->as<ASTSetQuery &>();
auto copy = memory_settings;
copy.applyChanges(settings_changes.changes);
copy.sanityCheck();
memory_settings = std::move(copy);
}
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
}
namespace
{
@ -499,7 +520,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
while (auto block = block_in.read())
{
if (compress)
if (memory_settings.compress)
{
Block compressed_block;
for (const auto & elem : block)
@ -534,7 +555,8 @@ void StorageMemory::checkAlterIsPossible(const AlterCommands & commands, Context
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN
&& command.type != AlterCommand::Type::COMMENT_TABLE && command.type != AlterCommand::Type::RENAME_COLUMN)
&& command.type != AlterCommand::Type::COMMENT_TABLE && command.type != AlterCommand::Type::RENAME_COLUMN
&& command.type != AlterCommand::Type::MODIFY_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
}
@ -566,9 +588,7 @@ void registerStorageMemory(StorageFactory & factory)
if (has_settings)
settings.loadFromQuery(*args.storage_def);
if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
settings.sanityCheck();
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
},

View File

@ -31,7 +31,7 @@ public:
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
const MemorySettings & settings = MemorySettings());
const MemorySettings & memory_settings_ = MemorySettings());
String getName() const override { return "Memory"; }
@ -46,6 +46,8 @@ public:
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
const MemorySettings & getMemorySettingsRef() const { return memory_settings; }
void read(
QueryPlan & query_plan,
const Names & column_names,
@ -78,6 +80,7 @@ public:
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder) override;
std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;
@ -134,12 +137,7 @@ private:
std::atomic<size_t> total_size_bytes = 0;
std::atomic<size_t> total_size_rows = 0;
bool compress;
UInt64 min_rows_to_keep;
UInt64 max_rows_to_keep;
UInt64 min_bytes_to_keep;
UInt64 max_bytes_to_keep;
MemorySettings memory_settings;
friend class ReadFromMemoryStorageStep;
};

View File

@ -0,0 +1,16 @@
TESTING BYTES
8192
9216
9216
65536
TESTING ROWS
50
1000
1020
1100
TESTING NO CIRCULAR-BUFFER
8192
9216
17408
82944
TESTING INVALID SETTINGS

View File

@ -0,0 +1,69 @@
SET max_block_size = 65409; -- Default value
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
ALTER TABLE memory MODIFY SETTING min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
SELECT 'TESTING BYTES';
/* 1. testing oldest block doesn't get deleted because of min-threshold */
INSERT INTO memory SELECT * FROM numbers(0, 1600);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 4.check large block over-writes all bytes / rows */
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;
SELECT 'TESTING ROWS';
/* 1. add normal number of rows */
INSERT INTO memory SELECT * FROM numbers(0, 50);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 2. table should have 1000 */
INSERT INTO memory SELECT * FROM numbers(50, 950);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 3. table should have 1020 - removed first 50 */
INSERT INTO memory SELECT * FROM numbers(2000, 70);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* 4. check large block over-writes all rows */
INSERT INTO memory SELECT * FROM numbers(3000, 1100);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
SELECT 'TESTING NO CIRCULAR-BUFFER';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
INSERT INTO memory SELECT * FROM numbers(0, 1600);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(1000, 100);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
SELECT 'TESTING INVALID SETTINGS';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100; -- { serverError 452 }
ALTER TABLE memory MODIFY SETTING min_bytes_to_keep = 100; -- { serverError 452 }
DROP TABLE memory;