Add ability to flush buffer only in background for StorageBuffer

Add 3 new engine arguments:
- flush_time
- flush_rows
- flush_bytes

That will be checked only for background flush, this maybe useful if
INSERT latency is "crucial".
This commit is contained in:
Azat Khuzhin 2021-04-12 09:04:38 +03:00
parent bd49e696c4
commit 19e0439629
8 changed files with 147 additions and 30 deletions

View File

@ -18,11 +18,17 @@ Engine parameters:
- `num_layers` Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
Optional engine parameters:
- `flush_time`, `flush_rows`, `flush_bytes` Conditions for flushing data from the buffer, that will happen only in background (ommited or zero means no `flush*` parameters).
Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.
- `min_time`, `max_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes` Condition for the number of bytes in the buffer.
Also if at least one `flush*` condition are met flush initiated in background, this is different from `max*`, since `flush*` allows you to configure background flushes separately to avoid adding latency for `INSERT` (into `Buffer`) queries.
- `min_time`, `max_time`, `flush_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows`, `flush_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes`, `flush_bytes` Condition for the number of bytes in the buffer.
During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer.

View File

@ -146,6 +146,9 @@
M(StorageBufferPassedTimeMaxThreshold, "") \
M(StorageBufferPassedRowsMaxThreshold, "") \
M(StorageBufferPassedBytesMaxThreshold, "") \
M(StorageBufferPassedTimeFlushThreshold, "") \
M(StorageBufferPassedRowsFlushThreshold, "") \
M(StorageBufferPassedBytesFlushThreshold, "") \
M(StorageBufferLayerLockReadersWaitMilliseconds, "Time for waiting for Buffer layer during reading") \
M(StorageBufferLayerLockWritersWaitMilliseconds, "Time for waiting free Buffer layer to write to (can be used to tune Buffer layers)") \
\

View File

@ -40,6 +40,9 @@ namespace ProfileEvents
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
extern const Event StorageBufferPassedTimeFlushThreshold;
extern const Event StorageBufferPassedRowsFlushThreshold;
extern const Event StorageBufferPassedBytesFlushThreshold;
extern const Event StorageBufferLayerLockReadersWaitMilliseconds;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds;
}
@ -103,6 +106,7 @@ StorageBuffer::StorageBuffer(
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
@ -110,6 +114,7 @@ StorageBuffer::StorageBuffer(
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, flush_thresholds(flush_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
@ -602,7 +607,7 @@ private:
{
buffer.data = sorted_block.cloneEmpty();
}
else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
@ -713,7 +718,7 @@ bool StorageBuffer::supportsPrewhere() const
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
@ -722,11 +727,11 @@ bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time,
size_t rows = buffer.data.rows() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
return checkThresholdsImpl(direct, rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
@ -752,6 +757,27 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p
return true;
}
if (!direct)
{
if (flush_thresholds.time && time_passed > flush_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold);
return true;
}
if (flush_thresholds.rows && rows > flush_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold);
return true;
}
if (flush_thresholds.bytes && bytes > flush_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold);
return true;
}
}
return false;
}
@ -785,7 +811,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (check_thresholds)
{
if (!checkThresholdsImpl(rows, bytes, time_passed))
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return;
}
else
@ -1040,16 +1066,17 @@ void registerStorageBuffer(StorageFactory & factory)
*
* db, table - in which table to put data from buffer.
* num_buckets - level of parallelism.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer,
* flush_time, flush_rows, flush_bytes - conditions for flushing.
*/
factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
if (engine_args.size() < 9 || engine_args.size() > 12)
throw Exception("Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Table and database name arguments accept expressions, evaluate them.
@ -1058,7 +1085,7 @@ void registerStorageBuffer(StorageFactory & factory)
// After we evaluated all expressions, check that all arguments are
// literals.
for (size_t i = 0; i < 9; i++)
for (size_t i = 0; i < engine_args.size(); i++)
{
if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
{
@ -1068,17 +1095,29 @@ void registerStorageBuffer(StorageFactory & factory)
}
}
String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
size_t i = 0;
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
String destination_database = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
StorageBuffer::Thresholds min;
StorageBuffer::Thresholds max;
StorageBuffer::Thresholds flush;
min.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID::createEmpty();
@ -1094,8 +1133,7 @@ void registerStorageBuffer(StorageFactory & factory)
args.constraints,
args.getContext(),
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
min, max, flush,
destination_id,
static_cast<bool>(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns));
},

View File

@ -35,6 +35,10 @@ namespace DB
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
*
* There are also separate thresholds for flush, those thresholds are checked only for non-direct flush.
* This maybe useful if you do not want to add extra latency for INSERT queries,
* so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only.
*
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
*/
@ -45,12 +49,11 @@ friend class BufferSource;
friend class BufferBlockOutputStream;
public:
/// Thresholds.
struct Thresholds
{
time_t time; /// The number of seconds from the insertion of the first row into the block.
size_t rows; /// The number of rows in the block.
size_t bytes; /// The number of (uncompressed) bytes in the block.
time_t time = 0; /// The number of seconds from the insertion of the first row into the block.
size_t rows = 0; /// The number of rows in the block.
size_t bytes = 0; /// The number of (uncompressed) bytes in the block.
};
std::string getName() const override { return "Buffer"; }
@ -135,6 +138,7 @@ private:
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const Thresholds flush_thresholds;
StorageID destination_id;
bool allow_materialized;
@ -153,8 +157,8 @@ private:
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
@ -177,6 +181,7 @@ protected:
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id,
bool allow_materialized_);
};

View File

@ -0,0 +1,22 @@
drop table if exists data_01811;
drop table if exists buffer_01811;
create table data_01811 (key Int) Engine=Memory();
/* Buffer with flush_rows=1000 */
create table buffer_01811 (key Int) Engine=Buffer(currentDatabase(), data_01811,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0
);
insert into buffer_01811 select * from numbers(10);
insert into buffer_01811 select * from numbers(10);
-- wait for background buffer flush
select sleep(3) format Null;
select count() from data_01811;
drop table buffer_01811;
drop table data_01811;

View File

@ -0,0 +1,42 @@
drop table if exists data_01817;
drop table if exists buffer_01817;
create table data_01817 (key Int) Engine=Null();
-- w/ flush_*
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0
);
drop table buffer_01817;
-- w/o flush_*
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6
);
drop table buffer_01817;
-- not enough args
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0 /* max_bytes= 4e6 */
); -- { serverError 42 }
-- too much args
create table buffer_01817 (key Int) Engine=Buffer(currentDatabase(), data_01817,
/* num_layers= */ 1,
/* min_time= */ 1, /* max_time= */ 86400,
/* min_rows= */ 1e9, /* max_rows= */ 1e6,
/* min_bytes= */ 0, /* max_bytes= */ 4e6,
/* flush_time= */ 86400, /* flush_rows= */ 10, /* flush_bytes= */0,
0
); -- { serverError 42 }
drop table data_01817;