From 19e04396295af4a15e6dddf674aaf16c6e285395 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 12 Apr 2021 09:04:38 +0300 Subject: [PATCH] Add ability to flush buffer only in background for StorageBuffer Add 3 new engine arguments: - flush_time - flush_rows - flush_bytes That will be checked only for background flush, this maybe useful if INSERT latency is "crucial". --- .../engines/table-engines/special/buffer.md | 12 ++- src/Common/ProfileEvents.cpp | 3 + src/Storages/StorageBuffer.cpp | 80 ++++++++++++++----- src/Storages/StorageBuffer.h | 17 ++-- ..._storage_buffer_flush_parameters.reference | 1 + .../01811_storage_buffer_flush_parameters.sql | 22 +++++ .../01817_storage_buffer_parameters.reference | 0 .../01817_storage_buffer_parameters.sql | 42 ++++++++++ 8 files changed, 147 insertions(+), 30 deletions(-) create mode 100644 tests/queries/0_stateless/01811_storage_buffer_flush_parameters.reference create mode 100644 tests/queries/0_stateless/01811_storage_buffer_flush_parameters.sql create mode 100644 tests/queries/0_stateless/01817_storage_buffer_parameters.reference create mode 100644 tests/queries/0_stateless/01817_storage_buffer_parameters.sql diff --git a/docs/en/engines/table-engines/special/buffer.md b/docs/en/engines/table-engines/special/buffer.md index bf6c08f8f6c..8245cd19e8c 100644 --- a/docs/en/engines/table-engines/special/buffer.md +++ b/docs/en/engines/table-engines/special/buffer.md @@ -18,11 +18,17 @@ Engine parameters: - `num_layers` – Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16. - `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` – Conditions for flushing data from the buffer. +Optional engine parameters: + +- `flush_time`, `flush_rows`, `flush_bytes` – Conditions for flushing data from the buffer, that will happen only in background (ommited or zero means no `flush*` parameters). + Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met. -- `min_time`, `max_time` – Condition for the time in seconds from the moment of the first write to the buffer. -- `min_rows`, `max_rows` – Condition for the number of rows in the buffer. -- `min_bytes`, `max_bytes` – Condition for the number of bytes in the buffer. +Also if at least one `flush*` condition are met flush initiated in background, this is different from `max*`, since `flush*` allows you to configure background flushes separately to avoid adding latency for `INSERT` (into `Buffer`) queries. + +- `min_time`, `max_time`, `flush_time` – Condition for the time in seconds from the moment of the first write to the buffer. +- `min_rows`, `max_rows`, `flush_rows` – Condition for the number of rows in the buffer. +- `min_bytes`, `max_bytes`, `flush_bytes` – Condition for the number of bytes in the buffer. During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer. diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d0876c5e69c..162d6e035cc 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -146,6 +146,9 @@ M(StorageBufferPassedTimeMaxThreshold, "") \ M(StorageBufferPassedRowsMaxThreshold, "") \ M(StorageBufferPassedBytesMaxThreshold, "") \ + M(StorageBufferPassedTimeFlushThreshold, "") \ + M(StorageBufferPassedRowsFlushThreshold, "") \ + M(StorageBufferPassedBytesFlushThreshold, "") \ M(StorageBufferLayerLockReadersWaitMilliseconds, "Time for waiting for Buffer layer during reading") \ M(StorageBufferLayerLockWritersWaitMilliseconds, "Time for waiting free Buffer layer to write to (can be used to tune Buffer layers)") \ \ diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 5c4b4e7d1d8..7b03622431d 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -40,6 +40,9 @@ namespace ProfileEvents extern const Event StorageBufferPassedTimeMaxThreshold; extern const Event StorageBufferPassedRowsMaxThreshold; extern const Event StorageBufferPassedBytesMaxThreshold; + extern const Event StorageBufferPassedTimeFlushThreshold; + extern const Event StorageBufferPassedRowsFlushThreshold; + extern const Event StorageBufferPassedBytesFlushThreshold; extern const Event StorageBufferLayerLockReadersWaitMilliseconds; extern const Event StorageBufferLayerLockWritersWaitMilliseconds; } @@ -103,6 +106,7 @@ StorageBuffer::StorageBuffer( size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const Thresholds & flush_thresholds_, const StorageID & destination_id_, bool allow_materialized_) : IStorage(table_id_) @@ -110,6 +114,7 @@ StorageBuffer::StorageBuffer( , num_shards(num_shards_), buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) + , flush_thresholds(flush_thresholds_) , destination_id(destination_id_) , allow_materialized(allow_materialized_) , log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")")) @@ -602,7 +607,7 @@ private: { buffer.data = sorted_block.cloneEmpty(); } - else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes())) + else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes())) { /** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer. * This also protects against unlimited consumption of RAM, since if it is impossible to write to the table, @@ -713,7 +718,7 @@ bool StorageBuffer::supportsPrewhere() const return false; } -bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const +bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const { time_t time_passed = 0; if (buffer.first_write_time) @@ -722,11 +727,11 @@ bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t rows = buffer.data.rows() + additional_rows; size_t bytes = buffer.data.bytes() + additional_bytes; - return checkThresholdsImpl(rows, bytes, time_passed); + return checkThresholdsImpl(direct, rows, bytes, time_passed); } -bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const +bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const { if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) { @@ -752,6 +757,27 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p return true; } + if (!direct) + { + if (flush_thresholds.time && time_passed > flush_thresholds.time) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold); + return true; + } + + if (flush_thresholds.rows && rows > flush_thresholds.rows) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold); + return true; + } + + if (flush_thresholds.bytes && bytes > flush_thresholds.bytes) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold); + return true; + } + } + return false; } @@ -785,7 +811,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc if (check_thresholds) { - if (!checkThresholdsImpl(rows, bytes, time_passed)) + if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed)) return; } else @@ -1040,16 +1066,17 @@ void registerStorageBuffer(StorageFactory & factory) * * db, table - in which table to put data from buffer. * num_buckets - level of parallelism. - * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer. + * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer, + * flush_time, flush_rows, flush_bytes - conditions for flushing. */ factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; - if (engine_args.size() != 9) - throw Exception("Storage Buffer requires 9 parameters: " - " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.", + if (engine_args.size() < 9 || engine_args.size() > 12) + throw Exception("Storage Buffer requires from 9 to 12 parameters: " + " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); // Table and database name arguments accept expressions, evaluate them. @@ -1058,7 +1085,7 @@ void registerStorageBuffer(StorageFactory & factory) // After we evaluated all expressions, check that all arguments are // literals. - for (size_t i = 0; i < 9; i++) + for (size_t i = 0; i < engine_args.size(); i++) { if (!typeid_cast(engine_args[i].get())) { @@ -1068,17 +1095,29 @@ void registerStorageBuffer(StorageFactory & factory) } } - String destination_database = engine_args[0]->as().value.safeGet(); - String destination_table = engine_args[1]->as().value.safeGet(); + size_t i = 0; - UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber(), engine_args[2]->as().value); + String destination_database = engine_args[i++]->as().value.safeGet(); + String destination_table = engine_args[i++]->as().value.safeGet(); - Int64 min_time = applyVisitor(FieldVisitorConvertToNumber(), engine_args[3]->as().value); - Int64 max_time = applyVisitor(FieldVisitorConvertToNumber(), engine_args[4]->as().value); - UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber(), engine_args[5]->as().value); - UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber(), engine_args[6]->as().value); - UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[7]->as().value); - UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[8]->as().value); + UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + + StorageBuffer::Thresholds min; + StorageBuffer::Thresholds max; + StorageBuffer::Thresholds flush; + + min.time = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + max.time = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + min.rows = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + max.rows = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + min.bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + max.bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + if (engine_args.size() > i) + flush.time = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + if (engine_args.size() > i) + flush.rows = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); + if (engine_args.size() > i) + flush.bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[i++]->as().value); /// If destination_id is not set, do not write data from the buffer, but simply empty the buffer. StorageID destination_id = StorageID::createEmpty(); @@ -1094,8 +1133,7 @@ void registerStorageBuffer(StorageFactory & factory) args.constraints, args.getContext(), num_buckets, - StorageBuffer::Thresholds{min_time, min_rows, min_bytes}, - StorageBuffer::Thresholds{max_time, max_rows, max_bytes}, + min, max, flush, destination_id, static_cast(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns)); }, diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index b29bbf179f4..1747c024a74 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -35,6 +35,10 @@ namespace DB * Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows, * and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table. * + * There are also separate thresholds for flush, those thresholds are checked only for non-direct flush. + * This maybe useful if you do not want to add extra latency for INSERT queries, + * so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only. + * * When you destroy a Buffer table, all remaining data is flushed to the subordinate table. * The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost. */ @@ -45,12 +49,11 @@ friend class BufferSource; friend class BufferBlockOutputStream; public: - /// Thresholds. struct Thresholds { - time_t time; /// The number of seconds from the insertion of the first row into the block. - size_t rows; /// The number of rows in the block. - size_t bytes; /// The number of (uncompressed) bytes in the block. + time_t time = 0; /// The number of seconds from the insertion of the first row into the block. + size_t rows = 0; /// The number of rows in the block. + size_t bytes = 0; /// The number of (uncompressed) bytes in the block. }; std::string getName() const override { return "Buffer"; } @@ -135,6 +138,7 @@ private: const Thresholds min_thresholds; const Thresholds max_thresholds; + const Thresholds flush_thresholds; StorageID destination_id; bool allow_materialized; @@ -153,8 +157,8 @@ private: /// are exceeded. If reset_block_structure is set - clears inner block /// structure inside buffer (useful in OPTIMIZE and ALTER). void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false); - bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; - bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const; + bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; + bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const; /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`. void writeBlockToDestination(const Block & block, StoragePtr table); @@ -177,6 +181,7 @@ protected: size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const Thresholds & flush_thresholds_, const StorageID & destination_id, bool allow_materialized_); }; diff --git a/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.reference b/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.reference new file mode 100644 index 00000000000..209e3ef4b62 --- /dev/null +++ b/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.reference @@ -0,0 +1 @@ +20 diff --git a/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.sql b/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.sql new file mode 100644 index 00000000000..dac68ad4ae8 --- /dev/null +++ b/tests/queries/0_stateless/01811_storage_buffer_flush_parameters.sql @@ -0,0 +1,22 @@ +drop table if exists data_01811; +drop table if exists buffer_01811; + +create table data_01811 (key Int) Engine=Memory(); +/* Buffer with flush_rows=1000 */ +create table buffer_01811 (key Int) Engine=Buffer(currentDatabase(), data_01811, + /* num_layers= */ 1, + /* min_time= */ 1, /* max_time= */ 86400, + /* min_rows= */ 1e9, /* max_rows= */ 1e6, + /* min_bytes= */ 0, /* max_bytes= */ 4e6, + /* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0 +); + +insert into buffer_01811 select * from numbers(10); +insert into buffer_01811 select * from numbers(10); + +-- wait for background buffer flush +select sleep(3) format Null; +select count() from data_01811; + +drop table buffer_01811; +drop table data_01811; diff --git a/tests/queries/0_stateless/01817_storage_buffer_parameters.reference b/tests/queries/0_stateless/01817_storage_buffer_parameters.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01817_storage_buffer_parameters.sql b/tests/queries/0_stateless/01817_storage_buffer_parameters.sql new file mode 100644 index 00000000000..84727bc5d6b --- /dev/null +++ b/tests/queries/0_stateless/01817_storage_buffer_parameters.sql @@ -0,0 +1,42 @@ +drop table if exists data_01817; +drop table if exists buffer_01817; + +create table data_01817 (key Int) Engine=Null(); + +-- w/ flush_* +create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817, + /* num_layers= */ 1, + /* min_time= */ 1, /* max_time= */ 86400, + /* min_rows= */ 1e9, /* max_rows= */ 1e6, + /* min_bytes= */ 0, /* max_bytes= */ 4e6, + /* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0 +); +drop table buffer_01817; + +-- w/o flush_* +create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817, + /* num_layers= */ 1, + /* min_time= */ 1, /* max_time= */ 86400, + /* min_rows= */ 1e9, /* max_rows= */ 1e6, + /* min_bytes= */ 0, /* max_bytes= */ 4e6 +); +drop table buffer_01817; + +-- not enough args +create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817, + /* num_layers= */ 1, + /* min_time= */ 1, /* max_time= */ 86400, + /* min_rows= */ 1e9, /* max_rows= */ 1e6, + /* min_bytes= */ 0 /* max_bytes= 4e6 */ +); -- { serverError 42 } +-- too much args +create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817, + /* num_layers= */ 1, + /* min_time= */ 1, /* max_time= */ 86400, + /* min_rows= */ 1e9, /* max_rows= */ 1e6, + /* min_bytes= */ 0, /* max_bytes= */ 4e6, + /* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0, + 0 +); -- { serverError 42 } + +drop table data_01817;