Merge pull request #26336 from azat/dist-per-table-monitor-settings

Add ability to set Distributed directory monitor settings via CREATE TABLE
This commit is contained in:
alexey-milovidov 2021-07-17 01:49:40 +03:00 committed by GitHub
commit bc907bd27c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 23 deletions

View File

@ -37,6 +37,14 @@ Also, it accepts the following settings:
- `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60.
- `monitor_batch_inserts` - same as [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts)
- `monitor_split_batch_on_failure` - same as [distributed_directory_monitor_split_batch_on_failure](../../../operations/settings/settings.md#distributed_directory_monitor_split_batch_on_failure)
- `monitor_sleep_time_ms` - same as [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms)
- `monitor_max_sleep_time_ms` - same as [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms)
!!! note "Note"
**Durability settings** (`fsync_...`):

View File

@ -345,15 +345,15 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, disk(disk_)
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
, should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts)
, split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure)
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
, default_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds())
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
, log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
@ -432,7 +432,7 @@ void StorageDistributedDirectoryMonitor::run()
do_sleep = true;
++status.error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))},
std::chrono::milliseconds{UInt64(default_sleep_time.count() * std::exp2(status.error_count))},
max_sleep_time);
tryLogCurrentException(getLoggerName().data());
status.last_exception = std::current_exception();

View File

@ -21,6 +21,11 @@ class ASTStorage;
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \
/** Directory monitor settings */ \
M(UInt64, monitor_batch_inserts, 0, "Default - distributed_directory_monitor_batch_inserts", 0) \
M(UInt64, monitor_split_batch_on_failure, 0, "Default - distributed_directory_monitor_split_batch_on_failure", 0) \
M(Milliseconds, monitor_sleep_time_ms, 0, "Default - distributed_directory_monitor_sleep_time_ms", 0) \
M(Milliseconds, monitor_max_sleep_time_ms, 0, "Default - distributed_directory_monitor_max_sleep_time_ms", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)

View File

@ -1292,8 +1292,11 @@ void registerStorageDistributed(StorageFactory & factory)
String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
const ContextPtr & context = args.getContext();
const ContextPtr & local_context = args.getLocalContext();
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], local_context);
String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
@ -1304,7 +1307,7 @@ void registerStorageDistributed(StorageFactory & factory)
/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
{
auto sharding_expr = buildShardingKeyExpression(sharding_key, args.getContext(), args.columns.getAllPhysical(), true);
auto sharding_expr = buildShardingKeyExpression(sharding_key, context, args.columns.getAllPhysical(), true);
const Block & block = sharding_expr->getSampleBlock();
if (block.columns() != 1)
@ -1335,6 +1338,16 @@ void registerStorageDistributed(StorageFactory & factory)
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
/// Set default values from the distributed_directory_monitor_* global context settings.
if (!distributed_settings.monitor_batch_inserts.changed)
distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts;
if (!distributed_settings.monitor_split_batch_on_failure.changed)
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
if (!distributed_settings.monitor_sleep_time_ms.changed)
distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms);
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms);
return StorageDistributed::create(
args.table_id,
args.columns,
@ -1343,7 +1356,7 @@ void registerStorageDistributed(StorageFactory & factory)
remote_database,
remote_table,
cluster_name,
args.getContext(),
context,
sharding_key,
storage_policy,
args.relative_data_path,

View File

@ -1,12 +1,11 @@
SET distributed_directory_monitor_batch_inserts=1;
SET distributed_directory_monitor_sleep_time_ms=10;
SET distributed_directory_monitor_max_sleep_time_ms=100;
DROP TABLE IF EXISTS test_01040;
DROP TABLE IF EXISTS dist_test_01040;
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key);
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key) SETTINGS
monitor_batch_inserts=1,
monitor_sleep_time_ms=10,
monitor_max_sleep_time_ms=100;
-- internal_replication=false
SELECT 'test_cluster_two_shards prefer_localhost_replica=0';
@ -26,7 +25,10 @@ TRUNCATE TABLE test_01040;
DROP TABLE dist_test_01040;
-- internal_replication=true
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key);
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key) SETTINGS
monitor_batch_inserts=1,
monitor_sleep_time_ms=10,
monitor_max_sleep_time_ms=100;
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=0';
SET prefer_localhost_replica=0;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);

View File

@ -2,33 +2,31 @@
-- (i.e. no .bin files and hence no sending is required)
set prefer_localhost_replica=0;
set distributed_directory_monitor_sleep_time_ms=50;
drop table if exists data_01460;
drop table if exists dist_01460;
create table data_01460 as system.one engine=Null();
create table dist_01460 as data_01460 engine=Distributed(test_shard_localhost, currentDatabase(), data_01460);
create table dist_01460 as data_01460 engine=Distributed(test_shard_localhost, currentDatabase(), data_01460) settings monitor_sleep_time_ms=50;
select 'INSERT';
select value from system.metrics where metric = 'DistributedFilesToInsert';
insert into dist_01460 select * from system.one;
select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms
select sleep(1) format Null; -- monitor_sleep_time_ms
select value from system.metrics where metric = 'DistributedFilesToInsert';
select 'STOP/START DISTRIBUTED SENDS';
system stop distributed sends dist_01460;
insert into dist_01460 select * from system.one;
select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms
select sleep(1) format Null; -- monitor_sleep_time_ms
select value from system.metrics where metric = 'DistributedFilesToInsert';
system start distributed sends dist_01460;
select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms
select sleep(1) format Null; -- monitor_sleep_time_ms
select value from system.metrics where metric = 'DistributedFilesToInsert';
select 'FLUSH DISTRIBUTED';
system stop distributed sends dist_01460;
insert into dist_01460 select * from system.one;
select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms
select sleep(1) format Null; -- monitor_sleep_time_ms
select value from system.metrics where metric = 'DistributedFilesToInsert';
system flush distributed dist_01460;
select value from system.metrics where metric = 'DistributedFilesToInsert';
@ -36,7 +34,7 @@ select value from system.metrics where metric = 'DistributedFilesToInsert';
select 'DROP TABLE';
system stop distributed sends dist_01460;
insert into dist_01460 select * from system.one;
select sleep(1) format Null; -- distributed_directory_monitor_sleep_time_ms
select sleep(1) format Null; -- monitor_sleep_time_ms
select value from system.metrics where metric = 'DistributedFilesToInsert';
drop table dist_01460;
select value from system.metrics where metric = 'DistributedFilesToInsert';