Add ability to set Distributed directory monitor settings via CREATE TABLE

This commit is contained in:
Azat Khuzhin 2021-07-15 09:26:10 +03:00
parent 56173c565d
commit f3d3ec44a6
4 changed files with 34 additions and 8 deletions

View File

@ -37,6 +37,14 @@ Also, it accepts the following settings:
- `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60.
- `monitor_batch_inserts` - same as [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts)
- `monitor_split_batch_on_failure` - same as [distributed_directory_monitor_split_batch_on_failure](../../../operations/settings/settings.md#distributed_directory_monitor_split_batch_on_failure)
- `monitor_sleep_time_ms` - same as [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms)
- `monitor_max_sleep_time_ms` - same as [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms)
!!! note "Note"
**Durability settings** (`fsync_...`):

View File

@ -345,15 +345,15 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, disk(disk_)
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
, should_batch_inserts(storage.getContext()->getSettingsRef().distributed_directory_monitor_batch_inserts)
, split_batch_on_failure(storage.getContext()->getSettingsRef().distributed_directory_monitor_split_batch_on_failure)
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
, default_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getContext()->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds())
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
, log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)

View File

@ -21,6 +21,11 @@ class ASTStorage;
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \
/** Directory monitor settings */ \
M(UInt64, monitor_batch_inserts, 0, "Default - distributed_directory_monitor_batch_inserts", 0) \
M(UInt64, monitor_split_batch_on_failure, 0, "Default - distributed_directory_monitor_split_batch_on_failure", 0) \
M(Milliseconds, monitor_sleep_time_ms, 0, "Default - distributed_directory_monitor_sleep_time_ms", 0) \
M(Milliseconds, monitor_max_sleep_time_ms, 0, "Default - distributed_directory_monitor_max_sleep_time_ms", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)

View File

@ -1292,8 +1292,11 @@ void registerStorageDistributed(StorageFactory & factory)
String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
const ContextPtr & context = args.getContext();
const ContextPtr & local_context = args.getLocalContext();
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], local_context);
String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
@ -1304,7 +1307,7 @@ void registerStorageDistributed(StorageFactory & factory)
/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
{
auto sharding_expr = buildShardingKeyExpression(sharding_key, args.getContext(), args.columns.getAllPhysical(), true);
auto sharding_expr = buildShardingKeyExpression(sharding_key, context, args.columns.getAllPhysical(), true);
const Block & block = sharding_expr->getSampleBlock();
if (block.columns() != 1)
@ -1335,6 +1338,16 @@ void registerStorageDistributed(StorageFactory & factory)
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
/// Set default values from the distributed_directory_monitor_* global context settings.
if (!distributed_settings.monitor_batch_inserts.changed)
distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts;
if (!distributed_settings.monitor_split_batch_on_failure.changed)
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
if (!distributed_settings.monitor_sleep_time_ms.changed)
distributed_settings.monitor_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_sleep_time_ms);
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
distributed_settings.monitor_max_sleep_time_ms = Poco::Timespan(context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms);
return StorageDistributed::create(
args.table_id,
args.columns,
@ -1343,7 +1356,7 @@ void registerStorageDistributed(StorageFactory & factory)
remote_database,
remote_table,
cluster_name,
args.getContext(),
context,
sharding_key,
storage_policy,
args.relative_data_path,