This commit is contained in:
kssenii 2023-12-11 12:34:39 +01:00
parent d34a2782ff
commit e10d06af6f
4 changed files with 70 additions and 51 deletions

View File

@ -135,9 +135,9 @@ void FileCache::initialize()
for (size_t i = 0; i < background_download_threads; ++i)
{
download_threads.emplace_back(std::make_unique<DownloadThread>());
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[&, this] { metadata.downloadThreadFunc(download_threads.back()->stop_flag); });
[this, download_thread = download_threads.back()] { metadata.downloadThreadFunc(download_thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
@ -1262,19 +1262,18 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations()
{
{
auto lock = lockCache();
shutdown = true;
}
shutdown.store(true);
metadata.cancelDownload();
metadata.cancelCleanup();
for (const auto & download_thread : download_threads)
{
download_thread->stop_flag.store(true);
if (download_thread->thread && download_thread->thread->joinable())
download_thread->thread->join();
std::lock_guard lock(apply_settings_mutex);
for (const auto & download_thread : download_threads)
{
download_thread->stop_flag.store(true);
if (download_thread->thread && download_thread->thread->joinable())
download_thread->thread->join();
}
}
if (cleanup_thread && cleanup_thread->joinable())
@ -1366,56 +1365,53 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
if (!is_initialized || shutdown || new_settings == actual_settings)
return;
Int32 download_threads_diff = 0;
std::lock_guard lock(apply_settings_mutex);
size_t background_download_queue_size_limit = metadata.getBackgroundDownloadQueueSizeLimit();
if (background_download_queue_size_limit != new_settings.background_download_queue_size_limit)
{
std::lock_guard lock(apply_settings_mutex);
metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
size_t background_download_queue_size_limit = metadata.getBackgroundDownloadQueueSizeLimit();
if (background_download_queue_size_limit != new_settings.background_download_queue_size_limit)
{
metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
LOG_INFO(log, "Changed background_download_queue_size_limit from {} to {}",
background_download_queue_size_limit, new_settings.background_download_queue_size_limit);
}
if (background_download_threads != new_settings.background_download_threads)
{
if (background_download_threads < new_settings.background_download_threads)
{
download_threads_diff = Int32(new_settings.background_download_threads) - Int32(background_download_threads);
background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads;
}
else if (background_download_threads > new_settings.background_download_threads)
{
download_threads_diff = Int32(new_settings.background_download_threads) - Int32(background_download_threads);
background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads;
}
LOG_INFO(log, "Changed background_download_threads from {} to {} (diff: {})",
background_download_threads, new_settings.background_download_threads, download_threads_diff);
}
LOG_INFO(log, "Changed background_download_queue_size_limit from {} to {}",
background_download_queue_size_limit, new_settings.background_download_queue_size_limit);
}
if (download_threads_diff)
if (background_download_threads != download_threads.size())
{
auto lock = lockCache();
if (shutdown)
return;
chassert(false);
background_download_threads = download_threads.size();
}
if (download_threads_diff > 0)
if (background_download_threads != new_settings.background_download_threads)
{
if (background_download_threads < new_settings.background_download_threads)
{
for (size_t i = 0; i < size_t(download_threads_diff); ++i)
size_t add_threads = new_settings.background_download_threads - background_download_threads;
try
{
download_threads.emplace_back(std::make_unique<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[&, this] { metadata.downloadThreadFunc(download_threads.back()->stop_flag); });
while (add_threads--)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[this, download_thread = download_threads.back()] { metadata.downloadThreadFunc(download_thread->stop_flag); });
}
}
catch (...)
{
if (!download_threads.back()->thread)
download_threads.pop_back();
background_download_threads = actual_settings.background_download_threads = download_threads.size();
throw;
}
background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads;
}
else
else if (background_download_threads > new_settings.background_download_threads)
{
for (size_t i = 0; i < size_t(-download_threads_diff); ++i)
size_t remove_threads = background_download_threads - new_settings.background_download_threads;
for (size_t i = 0; i < remove_threads; ++i)
{
auto & download_thread = *download_threads.back();
download_thread.stop_flag.store(true);
@ -1425,7 +1421,12 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
download_threads.pop_back();
}
background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads;
}
LOG_INFO(log, "Changed background_download_threads from {} to {}",
background_download_threads, new_settings.background_download_threads);
}
}

View File

@ -216,7 +216,7 @@ private:
std::unique_ptr<ThreadFromGlobalPool> thread;
std::atomic_bool stop_flag{false};
};
std::vector<std::unique_ptr<DownloadThread>> download_threads;
std::vector<std::shared_ptr<DownloadThread>> download_threads;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;

View File

@ -1,3 +1,5 @@
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 1

View File

@ -31,3 +31,19 @@ mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>5<\/background_download_threads>|<background_download_threads>15<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>15<\/background_download_threads>|<background_download_threads>2<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"