fix settings seg-fault and expand test case coverage

This commit is contained in:
Jake Bamrah 2024-03-12 23:41:00 +01:00
parent a9e054e1c8
commit 5e66cd7154
8 changed files with 58 additions and 41 deletions

View File

@ -44,20 +44,16 @@ buffer (see [Engine Parameters](#engine-parameters)).
CREATE TABLE memory (i UInt32) ENGINE = Memory;
```
## Examples {#examples}
**Example 1: Setting parameters**
**Initialize settings**
``` sql
SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
```
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
**Example 2: Basic usage**
## Examples {#examples}
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory;
SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
INSERT INTO memory SELECT * FROM numbers(0, 1600);
@ -70,6 +66,8 @@ INSERT INTO memory SELECT * FROM numbers(9000, 1000);
/* 4. checking a very large block overrides all */
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
```
``` text

View File

@ -885,10 +885,6 @@ class IColumn;
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \
M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -93,10 +93,6 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
{"max_rows_to_keep", 0, 0, "Introducing new feature for memory tables."},
{"min_rows_to_keep", 0, 0, "Introducing new feature for memory tables."},
{"max_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."},
{"min_bytes_to_keep", 0, 0, "Introducing new feature for memory tables."},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},

View File

@ -10,6 +10,10 @@ class ASTStorage;
#define MEMORY_SETTINGS(M, ALIAS) \
M(Bool, compress, false, "Compress data in memory", 0) \
M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \
M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \
M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \
DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)

View File

@ -55,11 +55,10 @@ public:
MemorySink(
StorageMemory & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_)
ContextPtr context)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context_))
, context(context_)
, storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context))
{
}
@ -102,25 +101,20 @@ public:
inserted_rows += block.rows();
}
Settings settings = context->getSettings();
if ((settings.min_bytes_to_keep && settings.min_bytes_to_keep > settings.max_bytes_to_keep)
|| (settings.min_rows_to_keep && settings.min_rows_to_keep > settings.max_rows_to_keep))
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
while (!new_data->empty()
&& ((settings.max_bytes_to_keep && new_total_bytes > settings.max_bytes_to_keep)
|| (settings.max_rows_to_keep && new_total_rows > settings.max_rows_to_keep)))
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
{
Block oldest_block = new_data->front();
UInt64 rows_to_remove = oldest_block.rows();
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
if (new_total_bytes - bytes_to_remove < settings.min_bytes_to_keep
|| new_total_rows - rows_to_remove < settings.min_rows_to_keep)
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
{
break; // stop - removing next block will put us under min_bytes / min_rows threshold
}
@ -143,7 +137,6 @@ private:
Blocks new_blocks;
StorageMemory & storage;
StorageSnapshotPtr storage_snapshot;
ContextPtr context;
};
@ -152,8 +145,10 @@ StorageMemory::StorageMemory(
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
bool compress_)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(compress_)
const MemorySettings & settings)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
@ -571,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory)
if (has_settings)
settings.loadFromQuery(*args.storage_def);
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings.compress);
if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
},
{
.supports_settings = true,

View File

@ -7,6 +7,7 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Storages/IStorage.h>
#include <Storages/MemorySettings.h>
#include <Common/MultiVersion.h>
@ -30,7 +31,7 @@ public:
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
bool compress_ = false);
const MemorySettings & settings = MemorySettings());
String getName() const override { return "Memory"; }
@ -134,6 +135,11 @@ private:
std::atomic<size_t> total_size_rows = 0;
bool compress;
UInt64 min_rows_to_keep;
UInt64 max_rows_to_keep;
UInt64 min_bytes_to_keep;
UInt64 max_bytes_to_keep;
friend class ReadFromMemoryStorageStep;
};

View File

@ -1,6 +1,5 @@
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
SET min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
/* TESTING BYTES */
/* 1. testing oldest block doesn't get deleted because of min-threshold */
@ -19,9 +18,8 @@ SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = curre
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
truncate memory;
SET min_rows_to_keep = 100, max_rows_to_keep = 1000;
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
/* TESTING ROWS */
/* 1. add normal number of rows */
@ -40,8 +38,24 @@ SELECT total_rows FROM system.tables WHERE name = 'memory' and database = curren
INSERT INTO memory SELECT * FROM numbers(3000, 1100);
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* test invalid settings */
SET min_bytes_to_keep = 4096, max_bytes_to_keep = 0;
INSERT INTO memory SELECT * FROM numbers(3000, 1100); -- { serverError 452 }
/* TESTING NO CIRCULAR-BUFFER */
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
INSERT INTO memory SELECT * FROM numbers(0, 1600);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(1000, 100);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
/* TESTING INVALID SETTINGS */
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 }
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 }
DROP TABLE memory;