From f3d3ec44a661dec5aa3cd1146b175cc1022fecb8 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 15 Jul 2021 09:26:10 +0300 Subject: [PATCH 1/4] Add ability to set Distributed directory monitor settings via CREATE TABLE --- .../table-engines/special/distributed.md | 8 +++++++ src/Storages/Distributed/DirectoryMonitor.cpp | 8 +++---- .../Distributed/DistributedSettings.h | 5 +++++ src/Storages/StorageDistributed.cpp | 21 +++++++++++++++---- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index 6de6602a216..5c911c6cc0a 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -37,6 +37,14 @@ Also, it accepts the following settings: - `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60. +- `monitor_batch_inserts` - same as [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts) + +- `monitor_split_batch_on_failure` - same as [distributed_directory_monitor_split_batch_on_failure](../../../operations/settings/settings.md#distributed_directory_monitor_split_batch_on_failure) + +- `monitor_sleep_time_ms` - same as [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms) + +- `monitor_max_sleep_time_ms` - same as [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms) + !!! note "Note" **Durability settings** (`fsync_...`): diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 17c0eec5c49..94f60c59512 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -345,15 +345,15 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , disk(disk_) , relative_path(relative_path_) , path(fs::path(disk->getPath()) / relative_path / "") - , should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts) - , split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure) + , should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts) + , split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure) , dir_fsync(storage.getDistributedSettingsRef().fsync_directories) , min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows) , min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes) , current_batch_file_path(path + "current_batch.txt") - , default_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()) + , default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds()) , sleep_time(default_sleep_time) - , max_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()) + , max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds()) , log(&Poco::Logger::get(getLoggerName())) , monitor_blocker(monitor_blocker_) , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 7296fa11ffd..8cc942cab02 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -21,6 +21,11 @@ class ASTStorage; M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \ M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \ M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \ + /** Directory monitor settings */ \ + M(UInt64, monitor_batch_inserts, 0, "Default - distributed_directory_monitor_batch_inserts", 0) \ + M(UInt64, monitor_split_batch_on_failure, 0, "Default - distributed_directory_monitor_split_batch_on_failure", 0) \ + M(Milliseconds, monitor_sleep_time_ms, 0, "Default - distributed_directory_monitor_sleep_time_ms", 0) \ + M(Milliseconds, monitor_max_sleep_time_ms, 0, "Default - distributed_directory_monitor_max_sleep_time_ms", 0) \ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 21fa06e19f0..c4dd0e27b14 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1292,8 +1292,11 @@ void registerStorageDistributed(StorageFactory & factory) String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]); - engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext()); - engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext()); + const ContextPtr & context = args.getContext(); + const ContextPtr & local_context = args.getLocalContext(); + + engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], local_context); + engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], local_context); String remote_database = engine_args[1]->as().value.safeGet(); String remote_table = engine_args[2]->as().value.safeGet(); @@ -1304,7 +1307,7 @@ void registerStorageDistributed(StorageFactory & factory) /// Check that sharding_key exists in the table and has numeric type. if (sharding_key) { - auto sharding_expr = buildShardingKeyExpression(sharding_key, args.getContext(), args.columns.getAllPhysical(), true); + auto sharding_expr = buildShardingKeyExpression(sharding_key, context, args.columns.getAllPhysical(), true); const Block & block = sharding_expr->getSampleBlock(); if (block.columns() != 1) @@ -1335,6 +1338,16 @@ void registerStorageDistributed(StorageFactory & factory) "bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)"); } + /// Set default values from the distributed_directory_monitor_* global context settings. + if (!distributed_settings.monitor_batch_inserts.changed) + distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts; + if (!distributed_settings.monitor_split_batch_on_failure.changed) + distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure; + if (!distributed_settings.monitor_sleep_time_ms.changed) + distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms); + if (!distributed_settings.monitor_max_sleep_time_ms.changed) + distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms); + return StorageDistributed::create( args.table_id, args.columns, @@ -1343,7 +1356,7 @@ void registerStorageDistributed(StorageFactory & factory) remote_database, remote_table, cluster_name, - args.getContext(), + context, sharding_key, storage_policy, args.relative_data_path, From df5a26738d2fe80ab7a063a4a6f70aa582122a89 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 15 Jul 2021 09:34:27 +0300 Subject: [PATCH 2/4] Fix 01040_distributed_directory_monitor_batch_inserts by properly set batch mode --- ...distributed_directory_monitor_batch_inserts.sql | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/queries/0_stateless/01040_distributed_directory_monitor_batch_inserts.sql b/tests/queries/0_stateless/01040_distributed_directory_monitor_batch_inserts.sql index dec748789c8..5e30b6b1a9d 100644 --- a/tests/queries/0_stateless/01040_distributed_directory_monitor_batch_inserts.sql +++ b/tests/queries/0_stateless/01040_distributed_directory_monitor_batch_inserts.sql @@ -1,12 +1,11 @@ -SET distributed_directory_monitor_batch_inserts=1; -SET distributed_directory_monitor_sleep_time_ms=10; -SET distributed_directory_monitor_max_sleep_time_ms=100; - DROP TABLE IF EXISTS test_01040; DROP TABLE IF EXISTS dist_test_01040; CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog(); -CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key); +CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key) SETTINGS + monitor_batch_inserts=1, + monitor_sleep_time_ms=10, + monitor_max_sleep_time_ms=100; -- internal_replication=false SELECT 'test_cluster_two_shards prefer_localhost_replica=0'; @@ -26,7 +25,10 @@ TRUNCATE TABLE test_01040; DROP TABLE dist_test_01040; -- internal_replication=true -CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key); +CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key) SETTINGS + monitor_batch_inserts=1, + monitor_sleep_time_ms=10, + monitor_max_sleep_time_ms=100; SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=0'; SET prefer_localhost_replica=0; INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2); From e2d8ba893afca575660797dd34e5caf5280ac961 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 16 Jul 2021 04:08:59 +0300 Subject: [PATCH 3/4] Fix 01460_DistributedFilesToInsert (correctly set dist table settings) --- .../0_stateless/01460_DistributedFilesToInsert.sql | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/queries/0_stateless/01460_DistributedFilesToInsert.sql b/tests/queries/0_stateless/01460_DistributedFilesToInsert.sql index 34c0d55d573..02e3d3ef73f 100644 --- a/tests/queries/0_stateless/01460_DistributedFilesToInsert.sql +++ b/tests/queries/0_stateless/01460_DistributedFilesToInsert.sql @@ -2,33 +2,31 @@ -- (i.e. no .bin files and hence no sending is required) set prefer_localhost_replica=0; -set distributed_directory_monitor_sleep_time_ms=50; - drop table if exists data_01460; drop table if exists dist_01460; create table data_01460 as system.one engine=Null(); -create table dist_01460 as data_01460 engine=Distributed(test_shard_localhost, currentDatabase(), data_01460); +create table dist_01460 as data_01460 engine=Distributed(test_shard_localhost, currentDatabase(), data_01460) settings monitor_sleep_time_ms=50; select 'INSERT'; select value from system.metrics where metric = 'DistributedFilesToInsert'; insert into dist_01460 select * from system.one; -select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms +select sleep(1) format Null; -- monitor_sleep_time_ms select value from system.metrics where metric = 'DistributedFilesToInsert'; select 'STOP/START DISTRIBUTED SENDS'; system stop distributed sends dist_01460; insert into dist_01460 select * from system.one; -select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms +select sleep(1) format Null; -- monitor_sleep_time_ms select value from system.metrics where metric = 'DistributedFilesToInsert'; system start distributed sends dist_01460; -select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms +select sleep(1) format Null; -- monitor_sleep_time_ms select value from system.metrics where metric = 'DistributedFilesToInsert'; select 'FLUSH DISTRIBUTED'; system stop distributed sends dist_01460; insert into dist_01460 select * from system.one; -select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms +select sleep(1) format Null; -- monitor_sleep_time_ms select value from system.metrics where metric = 'DistributedFilesToInsert'; system flush distributed dist_01460; select value from system.metrics where metric = 'DistributedFilesToInsert'; @@ -36,7 +34,7 @@ select value from system.metrics where metric = 'DistributedFilesToInsert'; select 'DROP TABLE'; system stop distributed sends dist_01460; insert into dist_01460 select * from system.one; -select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms +select sleep(1) format Null; -- monitor_sleep_time_ms select value from system.metrics where metric = 'DistributedFilesToInsert'; drop table dist_01460; select value from system.metrics where metric = 'DistributedFilesToInsert'; From a3653bd665bd59c7c05b0fcd58ea2246e0b69915 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 16 Jul 2021 04:08:59 +0300 Subject: [PATCH 4/4] Fix overflow in exponential sleep in DirectoryMonitor UBsan reports: SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior ../src/Storages/Distributed/DirectoryMonitor.cpp:435:53 in ../src/Storages/Distributed/DirectoryMonitor.cpp:435: runtime error: 1.15292e+19 is outside the range of representable values of type 'long' 0 0x1df0c286 in DB::StorageDistributedDirectoryMonitor::run() obj-x86_64-linux-gnu/../src/Storages/Distributed/DirectoryMonitor.cpp:435:53 It is pretty easy to reproduce by limiting max_server_memory_usage before staring the test. --- src/Storages/Distributed/DirectoryMonitor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 94f60c59512..960ac55daac 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -432,7 +432,7 @@ void StorageDistributedDirectoryMonitor::run() do_sleep = true; ++status.error_count; sleep_time = std::min( - std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))}, + std::chrono::milliseconds{UInt64(default_sleep_time.count() * std::exp2(status.error_count))}, max_sleep_time); tryLogCurrentException(getLoggerName().data()); status.last_exception = std::current_exception();