Parallel freeze of data parts

This commit is contained in:
Кирилл Гарбар 2024-11-11 16:44:00 +03:00
parent 051f87aeb7
commit 468ee6b4c2
7 changed files with 77 additions and 37 deletions

View File

@ -104,6 +104,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 max_thread_pool_free_size;
extern const ServerSettingsUInt64 max_thread_pool_size;
extern const ServerSettingsUInt64 max_unexpected_parts_loading_thread_pool_size;
extern const ServerSettingsUInt64 max_freeze_parts_thread_pool_size;
extern const ServerSettingsUInt64 mmap_cache_size;
extern const ServerSettingsBool show_addresses_in_stack_traces;
extern const ServerSettingsUInt64 thread_pool_queue_size;
@ -238,10 +239,17 @@ void LocalServer::initialize(Poco::Util::Application & self)
0, // We don't need any threads one all the parts will be deleted
cleanup_threads);
const size_t drop_tables_threas = server_settings[ServerSetting::database_catalog_drop_table_concurrency];
getDatabaseCatalogDropTablesThreadPool().initialize(
server_settings[ServerSetting::database_catalog_drop_table_concurrency],
drop_tables_threas,
0, // We don't need any threads if there are no DROP queries.
server_settings[ServerSetting::database_catalog_drop_table_concurrency]);
drop_tables_threas);
const size_t freeze_threads = server_settings[ServerSetting::max_freeze_parts_thread_pool_size];
getFreezePartThreadPool().initialize(
freeze_threads,
0,
freeze_threads);
}

View File

@ -280,6 +280,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 uncompressed_cache_size;
extern const ServerSettingsDouble uncompressed_cache_size_ratio;
extern const ServerSettingsBool use_legacy_mongodb_integration;
extern const ServerSettingsUInt64 max_freeze_parts_thread_pool_size;
}
}
@ -1203,6 +1204,11 @@ try
0, // We don't need any threads if there are no DROP queries.
server_settings[ServerSetting::database_catalog_drop_table_concurrency]);
getFreezePartThreadPool().initialize(
server_settings[ServerSetting::max_freeze_parts_thread_pool_size],
0,
server_settings[ServerSetting::max_freeze_parts_thread_pool_size]);
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{

View File

@ -234,6 +234,9 @@
M(OutdatedPartsLoadingThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
M(OutdatedPartsLoadingThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(OutdatedPartsLoadingThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Outdated data parts.") \
M(FreezePartThreads, "Number of threads in the threadpool for freezing data parts.") \
M(FreezePartThreadsActive, "Number of active threads in the threadpool for freezing data parts.") \
M(FreezePartThreadsScheduled, "Number of queued or active jobs in the threadpool for freezing data parts.") \
M(DistributedBytesToInsert, "Number of pending bytes to process for asynchronous insertion into Distributed tables. Number of bytes for every shard is summed.") \
M(BrokenDistributedBytesToInsert, "Number of bytes for asynchronous insertion into Distributed tables that has been marked as broken. Number of bytes for every shard is summed.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \

View File

@ -198,7 +198,8 @@ namespace DB
DECLARE(UInt64, load_marks_threadpool_pool_size, 50, "Size of background pool for marks loading", 0) \
DECLARE(UInt64, load_marks_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into prefetches pool", 0) \
DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0)
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
DECLARE(UInt64, max_freeze_parts_thread_pool_size, 16, "The number of threads for concurrent freeze of data parts.", 0)
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

View File

@ -29,6 +29,10 @@ namespace CurrentMetrics
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
extern const Metric FreezePartThreads;
extern const Metric FreezePartThreadsActive;
extern const Metric FreezePartThreadsScheduled;
}
namespace DB
@ -176,4 +180,10 @@ StaticThreadPool & getDatabaseCatalogDropTablesThreadPool()
return instance;
}
StaticThreadPool & getFreezePartThreadPool()
{
static StaticThreadPool instance("FreezePartThreadPool", CurrentMetrics::FreezePartThreads, CurrentMetrics::FreezePartThreadsActive, CurrentMetrics::FreezePartThreadsScheduled);
return instance;
}
}

View File

@ -72,4 +72,7 @@ StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();
/// ThreadPool used for dropping tables.
StaticThreadPool & getDatabaseCatalogDropTablesThreadPool();
/// ThreadPool used for freezing tables.
StaticThreadPool & getFreezePartThreadPool();
}

View File

@ -7862,53 +7862,62 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
disk->onFreeze(backup_path);
PartitionCommandsResultInfo result;
std::mutex result_mutex;
ThreadPoolCallbackRunnerLocal<void> runner(getFreezePartThreadPool().get(), "FreezePart");
size_t parts_processed = 0;
for (const auto & part : data_parts)
{
if (!matcher(part->info.partition_id))
continue;
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
runner(
[&]
{
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
auto data_part_storage = part->getDataPartStoragePtr();
String backup_part_path = fs::path(backup_path) / relative_data_path;
auto data_part_storage = part->getDataPartStoragePtr();
String backup_part_path = fs::path(backup_path) / relative_data_path;
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
auto callback = [this, &part, &backup_part_path](const DiskPtr & disk)
{
// Store metadata for replicated table.
// Do nothing for non-replicated.
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->getDataPartStorage().getPartDirectory());
};
auto callback = [this, &part, &backup_part_path](const DiskPtr & disk)
{
// Store metadata for replicated table.
// Do nothing for non-replicated.
createAndStoreFreezeMetadata(disk, part, fs::path(backup_part_path) / part->getDataPartStorage().getPartDirectory());
};
IDataPartStorage::ClonePartParams params
{
.make_source_readonly = true
};
auto new_storage = data_part_storage->freeze(
backup_part_path,
part->getDataPartStorage().getPartDirectory(),
local_context->getReadSettings(),
local_context->getWriteSettings(),
callback,
params);
IDataPartStorage::ClonePartParams params
{
.make_source_readonly = true
};
auto new_storage = data_part_storage->freeze(
backup_part_path,
part->getDataPartStorage().getPartDirectory(),
local_context->getReadSettings(),
local_context->getWriteSettings(),
callback,
params);
part->is_frozen.store(true, std::memory_order_relaxed);
result.push_back(PartitionCommandResultInfo{
.command_type = "FREEZE PART",
.partition_id = part->info.partition_id,
.part_name = part->name,
.backup_path = new_storage->getFullRootPath(),
.part_backup_path = new_storage->getFullPath(),
.backup_name = backup_name,
});
++parts_processed;
part->is_frozen.store(true, std::memory_order_relaxed);
{
std::lock_guard lock(result_mutex);
result.push_back(PartitionCommandResultInfo{
.command_type = "FREEZE PART",
.partition_id = part->info.partition_id,
.part_name = part->name,
.backup_path = new_storage->getFullRootPath(),
.part_backup_path = new_storage->getFullPath(),
.backup_name = backup_name,
});
}
}, Priority{0});
}
LOG_DEBUG(log, "Froze {} parts", parts_processed);
runner.waitForAllToFinishAndRethrowFirstError();
LOG_DEBUG(log, "Froze {} parts", result.size());
return result;
}