diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 325abc16f3f..73fc4578ce5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -78,6 +78,7 @@ struct Settings : public SettingsCollection M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ + M(SettingUInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \ M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6e30792277f..5d8a0e53276 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -317,6 +317,7 @@ struct ContextShared MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. + std::optional buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables. std::optional background_pool; /// The thread pool for the background work performed by the tables. std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) @@ -413,6 +414,7 @@ struct ContextShared embedded_dictionaries.reset(); external_dictionaries_loader.reset(); external_models_loader.reset(); + buffer_flush_schedule_pool.reset(); background_pool.reset(); background_move_pool.reset(); schedule_pool.reset(); @@ -1330,6 +1332,14 @@ BackgroundProcessingPool & Context::getBackgroundMovePool() return *shared->background_move_pool; } +BackgroundSchedulePool & Context::getBufferFlushSchedulePool() +{ + auto lock = getLock(); + if (!shared->buffer_flush_schedule_pool) + shared->buffer_flush_schedule_pool.emplace(settings.background_buffer_flush_schedule_pool_size); + return *shared->buffer_flush_schedule_pool; +} + BackgroundSchedulePool & Context::getSchedulePool() { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 6cf1a066b18..11937d4fc89 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -471,6 +471,7 @@ public: */ void dropCaches() const; + BackgroundSchedulePool & getBufferFlushSchedulePool(); BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 1765e663902..4f098b46ff5 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -76,6 +75,7 @@ StorageBuffer::StorageBuffer( , destination_id(destination_id_) , allow_materialized(allow_materialized_) , log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")")) + , bg_pool(global_context.getBufferFlushSchedulePool()) { setColumns(columns_); setConstraints(constraints_); @@ -83,12 +83,7 @@ StorageBuffer::StorageBuffer( StorageBuffer::~StorageBuffer() { - // Should not happen if shutdown was called - if (flush_thread.joinable()) - { - shutdown_event.set(); - flush_thread.join(); - } + flush_handle->deactivate(); } @@ -397,6 +392,9 @@ public: least_busy_lock = std::unique_lock(least_busy_buffer->mutex); } insertIntoBuffer(block, *least_busy_buffer); + least_busy_lock.unlock(); + + storage.reschedule(); } private: StorageBuffer & storage; @@ -458,16 +456,15 @@ void StorageBuffer::startup() << " Set appropriate system_profile to fix this."); } - flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this); + + flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); }); + flush_handle->activateAndSchedule(); } void StorageBuffer::shutdown() { - shutdown_event.set(); - - if (flush_thread.joinable()) - flush_thread.join(); + flush_handle->deactivate(); try { @@ -595,7 +592,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc ProfileEvents::increment(ProfileEvents::StorageBufferFlush); - LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds."); + LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds " << (check_thresholds ? "(bg)" : "(direct)") << "."); if (!destination_id) return; @@ -697,21 +694,42 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl } -void StorageBuffer::flushThread() +void StorageBuffer::flushBack() { - setThreadName("BufferFlush"); - - do + try { - try - { - flushAllBuffers(true); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } while (!shutdown_event.tryWait(1000)); + flushAllBuffers(true); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + reschedule(); +} + +void StorageBuffer::reschedule() +{ + time_t min_first_write_time = std::numeric_limits::max(); + time_t rows = 0; + + for (auto & buffer : buffers) + { + std::lock_guard lock(buffer.mutex); + min_first_write_time = buffer.first_write_time; + rows += buffer.data.rows(); + } + + /// will be rescheduled via INSERT + if (!rows) + return; + + time_t current_time = time(nullptr); + time_t time_passed = current_time - min_first_write_time; + + size_t min = std::max(min_thresholds.time - time_passed, 1); + size_t max = std::max(max_thresholds.time - time_passed, 1); + flush_handle->scheduleAfter(std::min(min, max) * 1000); } void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 93f95692b18..4c6c911e339 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -118,10 +118,6 @@ private: Poco::Logger * log; - Poco::Event shutdown_event; - /// Resets data by timeout. - ThreadFromGlobalPool flush_thread; - void flushAllBuffers(bool check_thresholds = true); /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded. void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false); @@ -131,7 +127,11 @@ private: /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`. void writeBlockToDestination(const Block & block, StoragePtr table); - void flushThread(); + void flushBack(); + void reschedule(); + + BackgroundSchedulePool & bg_pool; + BackgroundSchedulePoolTaskHolder flush_handle; protected: /** num_shards - the level of internal parallelism (the number of independent buffers) diff --git a/tests/queries/0_stateless/01246_buffer_flush.reference b/tests/queries/0_stateless/01246_buffer_flush.reference new file mode 100644 index 00000000000..a877e94b919 --- /dev/null +++ b/tests/queries/0_stateless/01246_buffer_flush.reference @@ -0,0 +1,10 @@ +min +0 +5 +max +5 +10 +direct +20 +drop +30 diff --git a/tests/queries/0_stateless/01246_buffer_flush.sql b/tests/queries/0_stateless/01246_buffer_flush.sql new file mode 100644 index 00000000000..efe0adf703a --- /dev/null +++ b/tests/queries/0_stateless/01246_buffer_flush.sql @@ -0,0 +1,44 @@ +drop table if exists data_01256; +drop table if exists buffer_01256; + +create table data_01256 as system.numbers Engine=Memory(); + +select 'min'; +create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1, + 2, 100, /* time */ + 4, 100, /* rows */ + 1, 1e6 /* bytes */ +); +insert into buffer_01256 select * from system.numbers limit 5; +select count() from data_01256; +-- sleep 2 (min time) + 1 (round up) + bias (1) = 4 +select sleepEachRow(2) from numbers(2) FORMAT Null; +select count() from data_01256; +drop table buffer_01256; + +select 'max'; +create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1, + 100, 2, /* time */ + 0, 100, /* rows */ + 0, 1e6 /* bytes */ +); +insert into buffer_01256 select * from system.numbers limit 5; +select count() from data_01256; +-- sleep 2 (min time) + 1 (round up) + bias (1) = 4 +select sleepEachRow(2) from numbers(2) FORMAT Null; +select count() from data_01256; +drop table buffer_01256; + +select 'direct'; +create table buffer_01256 as system.numbers Engine=Buffer(currentDatabase(), data_01256, 1, + 100, 100, /* time */ + 0, 9, /* rows */ + 0, 1e6 /* bytes */ +); +insert into buffer_01256 select * from system.numbers limit 10; +select count() from data_01256; + +select 'drop'; +insert into buffer_01256 select * from system.numbers limit 10; +drop table if exists buffer_01256; +select count() from data_01256;