Merge pull request #69376 from marco-vb/marco-vb/setting-stop-insert-on-full-disk

Add user-level settings min_free_diskspace_bytes_to_throw_insert and min_free_diskspace_ratio_to_throw_insert
This commit is contained in:
jsc0218 2024-09-17 18:46:43 +00:00 committed by GitHub
commit 839f06035f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 135 additions and 3 deletions

View File

@ -156,6 +156,26 @@ Default value: 1000.
ClickHouse artificially executes `INSERT` longer (adds sleep) so that the background merge process can merge parts faster than they are added. ClickHouse artificially executes `INSERT` longer (adds sleep) so that the background merge process can merge parts faster than they are added.
## min_free_disk_bytes_to_throw_insert {#min_free_disk_bytes_to_throw_insert}
The minimum number of bytes that should be free in disk space in order to insert data. If the number of available free bytes - `keep_free_space_bytes` is less than `min_free_disk_bytes_to_throw_insert` then an exception is thrown and the insert is not executed. Note that this setting does not take into account the amount of data that will be written by the `INSERT` operation.
Possible values:
- Any positive integer.
Default value: 0 bytes.
## min_free_disk_ratio_to_throw_insert {#min_free_disk_ratio_to_throw_insert}
The minimum free to total disk space ratio to perform an `INSERT`. The free space is calculated by subtracting `keep_free_space_bytes` from the total available space in disk.
Possible values:
- Float, 0.0 - 1.0
Default value: 0.0
## inactive_parts_to_throw_insert {#inactive-parts-to-throw-insert} ## inactive_parts_to_throw_insert {#inactive-parts-to-throw-insert}
If the number of inactive parts in a single partition more than the `inactive_parts_to_throw_insert` value, `INSERT` is interrupted with the "Too many inactive parts (N). Parts cleaning are processing significantly slower than inserts" exception. If the number of inactive parts in a single partition more than the `inactive_parts_to_throw_insert` value, `INSERT` is interrupted with the "Too many inactive parts (N). Parts cleaning are processing significantly slower than inserts" exception.

View File

@ -343,6 +343,8 @@ class IColumn;
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \ M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \ M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \
M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \ M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \
M(UInt64, min_free_disk_bytes_to_throw_insert, 0, "Minimum free disk space bytes to throw an insert.", 0) \
M(Double, min_free_disk_ratio_to_throw_insert, 0.0, "Minimum free disk space ratio to throw an insert.", 0) \
\ \
M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support final, it does not have any effect. On queries with multiple tables final is applied only on those that support it. It also works on distributed tables", 0) \ M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support final, it does not have any effect. On queries with multiple tables final is applied only on those that support it. It also works on distributed tables", 0) \
\ \

View File

@ -85,7 +85,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"}, {"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"},
{"join_to_sort_minimum_perkey_rows", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys"}, {"join_to_sort_minimum_perkey_rows", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys"},
{"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"}, {"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"},
{"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join"} {"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join"},
{"min_free_disk_bytes_to_throw_insert", 0, 0, "Maintain some free disk space bytes from inserts while still allowing for temporary writing."},
{"min_free_disk_ratio_to_throw_insert", 0.0, 0.0, "Maintain some free disk space bytes expressed as ratio to total disk space from inserts while still allowing for temporary writing."},
} }
}, },
{"24.8", {"24.8",

View File

@ -60,6 +60,7 @@ namespace ErrorCodes
extern const int ABORTED; extern const int ABORTED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS; extern const int TOO_MANY_PARTS;
extern const int NOT_ENOUGH_SPACE;
} }
namespace namespace
@ -553,6 +554,32 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
VolumePtr volume = data.getStoragePolicy()->getVolume(0); VolumePtr volume = data.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume); VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
const auto & data_settings = data.getSettings();
const UInt64 min_bytes = data_settings->min_free_disk_bytes_to_throw_insert;
const Float64 min_ratio = data_settings->min_free_disk_ratio_to_throw_insert;
if (min_bytes > 0 || min_ratio > 0.0)
{
const auto disk = data_part_volume->getDisk();
const UInt64 total_disk_bytes = *disk->getTotalSpace();
const UInt64 free_disk_bytes = *disk->getAvailableSpace();
const UInt64 min_bytes_from_ratio = static_cast<UInt64>(min_ratio * total_disk_bytes);
const UInt64 needed_free_bytes = std::max(min_bytes, min_bytes_from_ratio);
if (needed_free_bytes > free_disk_bytes)
{
throw Exception(
ErrorCodes::NOT_ENOUGH_SPACE,
"Could not perform insert: less than {} free bytes in disk space ({}). "
"Configure this limit with user settings {} or {}",
needed_free_bytes,
free_disk_bytes,
"min_free_disk_bytes_to_throw_insert",
"min_free_disk_ratio_to_throw_insert");
}
}
auto new_data_part = data.getDataPartBuilder(part_name, data_part_volume, part_dir) auto new_data_part = data.getDataPartBuilder(part_name, data_part_volume, part_dir)
.withPartFormat(data.choosePartFormat(expected_size, block.rows())) .withPartFormat(data.choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info) .withPartInfo(new_part_info)
@ -564,8 +591,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
if (data.storage_settings.get()->assign_part_uuids) if (data.storage_settings.get()->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4(); new_data_part->uuid = UUIDHelpers::generateV4();
const auto & data_settings = data.getSettings();
SerializationInfo::Settings settings{data_settings->ratio_of_defaults_for_sparse_serialization, true}; SerializationInfo::Settings settings{data_settings->ratio_of_defaults_for_sparse_serialization, true};
SerializationInfoByName infos(columns, settings); SerializationInfoByName infos(columns, settings);
infos.add(block); infos.add(block);
@ -688,6 +713,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
MergeTreeDataPartType part_type; MergeTreeDataPartType part_type;
/// Size of part would not be greater than block.bytes() + epsilon /// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
// just check if there is enough space on parent volume // just check if there is enough space on parent volume
MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;

View File

@ -99,6 +99,8 @@ struct Settings;
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
M(UInt64, min_free_disk_bytes_to_throw_insert, 0, "Minimum free disk space bytes to throw an insert.", 0) \
M(Double, min_free_disk_ratio_to_throw_insert, 0.0, "Minimum free disk space ratio to throw an insert.", 0) \
M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \ M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \ M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \
\ \

View File

@ -0,0 +1,19 @@
<clickhouse>
<storage_configuration>
<disks>
<disk1>
<type>local</type>
<path>/disk1/</path>
</disk1>
</disks>
<policies>
<only_disk1>
<volumes>
<main>
<disk>disk1</disk>
</main>
</volumes>
</only_disk1>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,61 @@
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.d/storage_configuration.xml"],
tmpfs=["/disk1:size=7M"],
macros={"shard": 0, "replica": 1},
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_insert_stops_when_disk_full(start_cluster):
min_free_bytes = 3 * 1024 * 1024 # 3 MiB
node.query(
f"""
CREATE TABLE test_table (
id UInt32,
data String
) ENGINE = MergeTree()
ORDER BY id
SETTINGS storage_policy = 'only_disk1', min_free_disk_bytes_to_throw_insert = {min_free_bytes}
"""
)
count = 0
# Insert data to fill up disk
try:
for _ in range(100000):
node.query(
"INSERT INTO test_table SELECT number, repeat('a', 1000 * 1000) FROM numbers(1)"
)
count += 1
except QueryRuntimeException as e:
assert "Could not perform insert" in str(e)
assert "free bytes in disk space" in str(e)
free_space = int(
node.query("SELECT free_space FROM system.disks WHERE name = 'disk1'").strip()
)
assert (
free_space <= min_free_bytes
), f"Free space ({free_space}) is less than min_free_bytes ({min_free_bytes})"
rows = int(node.query("SELECT count() from test_table").strip())
assert rows == count
node.query("DROP TABLE test_table")