Merge pull request #10315 from azat/buffer-flush-bg-pool

Use background thread pool for background buffer flushes
This commit is contained in:
alexey-milovidov 2020-04-18 15:04:35 +03:00 committed by GitHub
commit 17b6e0b3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 116 additions and 32 deletions

View File

@ -78,6 +78,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(SettingUInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \

View File

@ -317,6 +317,7 @@ struct ContextShared
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
@ -413,6 +414,7 @@ struct ContextShared
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
@ -1330,6 +1332,14 @@ BackgroundProcessingPool & Context::getBackgroundMovePool()
return *shared->background_move_pool;
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool()
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
shared->buffer_flush_schedule_pool.emplace(settings.background_buffer_flush_schedule_pool_size);
return *shared->buffer_flush_schedule_pool;
}
BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();

View File

@ -471,6 +471,7 @@ public:
*/
void dropCaches() const;
BackgroundSchedulePool & getBufferFlushSchedulePool();
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();

View File

@ -13,7 +13,6 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/FieldVisitors.h>
@ -76,6 +75,7 @@ StorageBuffer::StorageBuffer(
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
, bg_pool(global_context.getBufferFlushSchedulePool())
{
setColumns(columns_);
setConstraints(constraints_);
@ -83,12 +83,7 @@ StorageBuffer::StorageBuffer(
StorageBuffer::~StorageBuffer()
{
// Should not happen if shutdown was called
if (flush_thread.joinable())
{
shutdown_event.set();
flush_thread.join();
}
flush_handle->deactivate();
}
@ -397,6 +392,9 @@ public:
least_busy_lock = std::unique_lock(least_busy_buffer->mutex);
}
insertIntoBuffer(block, *least_busy_buffer);
least_busy_lock.unlock();
storage.reschedule();
}
private:
StorageBuffer & storage;
@ -458,16 +456,15 @@ void StorageBuffer::startup()
<< " Set appropriate system_profile to fix this.");
}
flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this);
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); });
flush_handle->activateAndSchedule();
}
void StorageBuffer::shutdown()
{
shutdown_event.set();
if (flush_thread.joinable())
flush_thread.join();
flush_handle->deactivate();
try
{
@ -595,7 +592,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds " << (check_thresholds ? "(bg)" : "(direct)") << ".");
if (!destination_id)
return;
@ -697,12 +694,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
}
void StorageBuffer::flushThread()
void StorageBuffer::flushBack()
{
setThreadName("BufferFlush");
do
{
try
{
flushAllBuffers(true);
@ -711,7 +704,32 @@ void StorageBuffer::flushThread()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} while (!shutdown_event.tryWait(1000));
reschedule();
}
void StorageBuffer::reschedule()
{
time_t min_first_write_time = std::numeric_limits<time_t>::max();
time_t rows = 0;
for (auto & buffer : buffers)
{
std::lock_guard lock(buffer.mutex);
min_first_write_time = buffer.first_write_time;
rows += buffer.data.rows();
}
/// will be rescheduled via INSERT
if (!rows)
return;
time_t current_time = time(nullptr);
time_t time_passed = current_time - min_first_write_time;
size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min(min, max) * 1000);
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)

View File

@ -4,7 +4,7 @@
#include <thread>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
@ -118,10 +118,6 @@ private:
Poco::Logger * log;
Poco::Event shutdown_event;
/// Resets data by timeout.
ThreadFromGlobalPool flush_thread;
void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
@ -131,7 +127,11 @@ private:
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
void flushThread();
void flushBack();
void reschedule();
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder flush_handle;
protected:
/** num_shards - the level of internal parallelism (the number of independent buffers)

View File

@ -0,0 +1,10 @@
min
0
5
max
5
10
direct
20
drop
30

View File

@ -0,0 +1,44 @@
drop table if exists data_01256;
drop table if exists buffer_01256;
create table data_01256 as system.numbers Engine=Memory();
select 'min';
create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1,
2, 100, /* time */
4, 100, /* rows */
1, 1e6 /* bytes */
);
insert into buffer_01256 select * from system.numbers limit 5;
select count() from data_01256;
-- sleep 2 (min time) + 1 (round up) + bias (1) = 4
select sleepEachRow(2) from numbers(2) FORMAT Null;
select count() from data_01256;
drop table buffer_01256;
select 'max';
create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1,
100, 2, /* time */
0, 100, /* rows */
0, 1e6 /* bytes */
);
insert into buffer_01256 select * from system.numbers limit 5;
select count() from data_01256;
-- sleep 2 (min time) + 1 (round up) + bias (1) = 4
select sleepEachRow(2) from numbers(2) FORMAT Null;
select count() from data_01256;
drop table buffer_01256;
select 'direct';
create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1,
100, 100, /* time */
0, 9, /* rows */
0, 1e6 /* bytes */
);
insert into buffer_01256 select * from system.numbers limit 10;
select count() from data_01256;
select 'drop';
insert into buffer_01256 select * from system.numbers limit 10;
drop table if exists buffer_01256;
select count() from data_01256;