Merge pull request #10263 from azat/distributed-send-bg-pool

Use background thread pool for distributed sends
This commit is contained in:
alexey-milovidov 2020-04-19 20:28:23 +03:00 committed by GitHub
commit 61d33a8d9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 38 deletions

View File

@ -81,7 +81,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(SettingMilliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \

View File

@ -321,6 +321,7 @@ struct ContextShared
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
@ -418,6 +419,7 @@ struct ContextShared
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
@ -1348,6 +1350,14 @@ BackgroundSchedulePool & Context::getSchedulePool()
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool()
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
shared->distributed_schedule_pool.emplace(settings.background_distributed_schedule_pool_size);
return *shared->distributed_schedule_pool;
}
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{
auto lock = getLock();

View File

@ -475,6 +475,7 @@ public:
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;

View File

@ -1,7 +1,6 @@
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ClickHouseRevision.h>
@ -78,7 +77,7 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool(std::move(pool_))
@ -92,7 +91,10 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, bg_pool(bg_pool_)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
}
@ -100,12 +102,9 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
if (!quit)
{
{
quit = true;
std::lock_guard lock{mutex};
}
quit = true;
cond.notify_one();
thread.join();
task_handle->deactivate();
}
}
@ -122,12 +121,9 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
if (!quit)
{
{
quit = true;
std::lock_guard lock{mutex};
}
quit = true;
cond.notify_one();
thread.join();
task_handle->deactivate();
}
Poco::File(path).remove(true);
@ -136,16 +132,11 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
void StorageDistributedDirectoryMonitor::run()
{
setThreadName("DistrDirMonitor");
std::unique_lock lock{mutex};
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
while (!quit_requested())
while (!quit)
{
auto do_sleep = true;
bool do_sleep = true;
if (!monitor_blocker.isCancelled())
{
try
@ -167,15 +158,25 @@ void StorageDistributedDirectoryMonitor::run()
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
cond.wait_for(lock, sleep_time, quit_requested);
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
error_count /= 2;
last_decrease_time = now;
}
if (do_sleep)
break;
}
if (!quit)
{
/// If there is no error, then it will be scheduled by the DistributedBlockOutputStream,
/// so this is just in case, hence it is distributed_directory_monitor_max_sleep_time_ms
if (error_count)
task_handle->scheduleAfter(sleep_time.count());
else
task_handle->scheduleAfter(max_sleep_time.count());
}
}
@ -586,6 +587,13 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con
return std::make_shared<DirectoryMonitorBlockInputStream>(file_name);
}
bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
{
if (quit)
return false;
return task_handle->scheduleAfter(ms);
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
{
std::unordered_set<UInt64> file_indices_to_skip;
@ -714,8 +722,13 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path)
{
std::lock_guard lock{mutex};
task_handle->deactivate();
path = new_path;
current_batch_file_path = path + "current_batch.txt";
task_handle->activateAndSchedule();
}
}

View File

@ -1,10 +1,9 @@
#pragma once
#include <Storages/StorageDistributed.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <IO/ReadBufferFromFile.h>
@ -20,7 +19,7 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_);
~StorageDistributedDirectoryMonitor();
@ -33,6 +32,9 @@ public:
void shutdownAndDropAllData();
static BlockInputStreamPtr createStreamFromFile(const String & file_name);
/// For scheduling via DistributedBlockOutputStream
bool scheduleAfter(size_t ms);
private:
void run();
bool processFiles();
@ -67,7 +69,9 @@ private:
std::condition_variable cond;
Logger * log;
ActionBlocker & monitor_blocker;
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder task_handle;
/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log);

View File

@ -589,8 +589,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
const std::string path(disk + data_path + dir_name + '/');
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(disk, dir_name);
Poco::File(path).createDirectory();
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
@ -632,6 +632,9 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
if (link(first_file_tmp_path.data(), block_file_path.data()))

View File

@ -577,15 +577,20 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk)
}
void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
{
const std::string path(disk + relative_data_path + name);
const std::string key(disk + name);
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(*this, path, node_data.conneciton_pool, monitors_blocker);
if (!node_data.directory_monitor)
{
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, path, node_data.conneciton_pool, monitors_blocker, global_context.getDistributedSchedulePool());
}
return *node_data.directory_monitor;
}
size_t StorageDistributed::getShardCount() const

View File

@ -109,7 +109,7 @@ public:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const std::string & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
void requireDirectoryMonitor(const std::string & disk, const std::string & name);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & disk, const std::string & name);
void flushClusterNodesAllData();

View File

@ -2,8 +2,11 @@ SET distributed_directory_monitor_batch_inserts=1;
SET distributed_directory_monitor_sleep_time_ms=10;
SET distributed_directory_monitor_max_sleep_time_ms=100;
CREATE TABLE test (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test AS test Engine=Distributed(test_cluster_two_shards, currentDatabase(), test, key);
INSERT INTO dist_test SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test;
SELECT * FROM dist_test;
DROP TABLE IF EXISTS test_01040;
DROP TABLE IF EXISTS dist_test_01040;
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key);
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
SELECT * FROM dist_test_01040;