Distributed: Add ability to delay/throttle INSERT until pending data will be reduced

Add two new settings for the Distributed engine:
- bytes_to_delay_insert
- max_delay_to_insert

If at the beginning of INSERT there will be too much pending data, more
then bytes_to_delay_insert, then the INSERT will wait until it will be
shrinked, and not more then max_delay_to_insert seconds.

If after this there will be still too much pending, it will throw an
exception.

Also new profile events were added (by analogy to the MergeTree):
- DistributedDelayedInserts (although you can use system.errors instead
  of this, but still)
- DistributedRejectedInserts
- DistributedDelayedInsertsMilliseconds
This commit is contained in:
Azat Khuzhin 2021-01-27 21:43:41 +03:00
parent 15f7459cae
commit 6965ac26c3
9 changed files with 163 additions and 9 deletions

View File

@ -31,12 +31,11 @@ Also it accept the following settings:
- `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc).
- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.
- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. Default 0.
See also:
- `bytes_to_delay_insert` - if more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay. Default 0.
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
- `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60.
!!! note "Note"
@ -46,6 +45,12 @@ Also it accept the following settings:
- May significantly decrease the inserts' performance
- Affect writing the data stored inside Distributed table folder into the **node which accepted your insert**. If you need to have guarantees of writing data to underlying MergeTree tables - see durability settings (`...fsync...`) in `system.merge_tree_settings`
For **Insert limit settings** (`..._insert`) see also:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
- `bytes_to_throw_insert` handled before `bytes_to_delay_insert`, so you should not set it to the value less then `bytes_to_delay_insert`
Example:
``` sql

View File

@ -68,6 +68,9 @@
M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \
M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \
M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \
M(DuplicatedInsertedBlocks, "Number of times the INSERTed block to a ReplicatedMergeTree table was deduplicated.") \
\
M(ZooKeeperInit, "") \

View File

@ -628,7 +628,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path;
storage.throwInsertIfNeeded();
storage.delayInsertOrThrowIfNeeded();
auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes());
const auto disk = reservation->getDisk();

View File

@ -19,6 +19,8 @@ class ASTStorage;
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)

View File

@ -14,6 +14,7 @@
#include <Columns/ColumnConst.h>
#include <Common/Macros.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
@ -70,6 +71,13 @@ const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2;
const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2;
}
namespace ProfileEvents
{
extern const Event DistributedRejectedInserts;
extern const Event DistributedDelayedInserts;
extern const Event DistributedDelayedInsertsMilliseconds;
}
namespace DB
{
@ -87,6 +95,7 @@ namespace ErrorCodes
extern const int INVALID_SHARD_ID;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace ActionLocks
@ -977,19 +986,53 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
relative_data_path = new_path_to_table_data;
}
void StorageDistributed::throwInsertIfNeeded() const
void StorageDistributed::delayInsertOrThrowIfNeeded() const
{
if (!distributed_settings.bytes_to_throw_insert)
if (!distributed_settings.bytes_to_throw_insert &&
!distributed_settings.bytes_to_delay_insert)
return;
UInt64 total_bytes = *totalBytes(global_context.getSettingsRef());
if (total_bytes > distributed_settings.bytes_to_throw_insert)
if (distributed_settings.bytes_to_throw_insert && total_bytes > distributed_settings.bytes_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert));
}
if (distributed_settings.bytes_to_delay_insert && total_bytes > distributed_settings.bytes_to_delay_insert)
{
/// Step is 5% of the delay and minimal one second.
/// NOTE: max_delay_to_insert is in seconds, and step is in ms.
const size_t step_ms = std::min<double>(1., double(distributed_settings.max_delay_to_insert) * 1'000 * 0.05);
UInt64 delayed_ms = 0;
do {
delayed_ms += step_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(step_ms));
} while (*totalBytes(global_context.getSettingsRef()) > distributed_settings.bytes_to_delay_insert && delayed_ms < distributed_settings.max_delay_to_insert*1000);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInserts);
ProfileEvents::increment(ProfileEvents::DistributedDelayedInsertsMilliseconds, delayed_ms);
UInt64 new_total_bytes = *totalBytes(global_context.getSettingsRef());
LOG_INFO(log, "Too many bytes pending for async INSERT: was {}, now {}, INSERT was delayed to {} ms",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(new_total_bytes),
delayed_ms);
if (new_total_bytes > distributed_settings.bytes_to_delay_insert)
{
ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts);
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_delay_insert={})",
formatReadableSizeWithBinarySuffix(new_total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_delay_insert));
}
}
}
void registerStorageDistributed(StorageFactory & factory)
@ -1057,6 +1100,17 @@ void registerStorageDistributed(StorageFactory & factory)
distributed_settings.loadFromQuery(*args.storage_def);
}
if (distributed_settings.max_delay_to_insert < 1)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"max_delay_to_insert cannot be less then 1");
if (distributed_settings.bytes_to_throw_insert && distributed_settings.bytes_to_delay_insert &&
distributed_settings.bytes_to_throw_insert <= distributed_settings.bytes_to_delay_insert)
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
return StorageDistributed::create(
args.table_id, args.columns, args.constraints,
remote_database, remote_table, cluster_name,

View File

@ -174,7 +174,7 @@ private:
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
void throwInsertIfNeeded() const;
void delayInsertOrThrowIfNeeded() const;
private:
String remote_database;

View File

@ -0,0 +1,3 @@
max_delay_to_insert will throw
max_delay_to_insert will succeed
flushed

View File

@ -0,0 +1,86 @@
#!/usr/bin/env bash
# NOTE: $SECONDS accuracy is second, so we need some delta, hence -1 in time conditions.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
max_delay_to_insert=5
${CLICKHOUSE_CLIENT} -nq "
drop table if exists dist_01675;
drop table if exists data_01675;
"
${CLICKHOUSE_CLIENT} -nq "
create table data_01675 (key Int) engine=Null();
create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert;
system stop distributed sends dist_01675;
"
#
# Case 1: max_delay_to_insert will throw.
#
echo "max_delay_to_insert will throw"
start_seconds=$SECONDS
${CLICKHOUSE_CLIENT} --testmode -nq "
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will fail, because of bytes_to_delay_insert=1 and max_delay_to_insert=5,
-- while distributed sends is stopped.
--
-- (previous block definitelly takes more, since it has header)
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 }
system flush distributed dist_01675;
"
end_seconds=$SECONDS
if (( (end_seconds-start_seconds)<(max_delay_to_insert-1) )); then
echo "max_delay_to_insert was not satisfied ($end_seconds-$start_seconds)"
fi
#
# Case 2: max_delay_to_insert will finally finished.
#
echo "max_delay_to_insert will succeed"
max_delay_to_insert=10
${CLICKHOUSE_CLIENT} -nq "
drop table dist_01675;
create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert;
system stop distributed sends dist_01675;
"
flush_delay=4
function flush_distributed_worker()
{
sleep $flush_delay
${CLICKHOUSE_CLIENT} -q "system flush distributed dist_01675"
echo flushed
}
flush_distributed_worker &
start_seconds=$SECONDS
${CLICKHOUSE_CLIENT} --testmode -nq "
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will succcedd, due to SYSTEM FLUSH DISTRIBUTED in background.
insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0;
"
end_seconds=$SECONDS
wait
if (( (end_seconds-start_seconds)<(flush_delay-1) )); then
echo "max_delay_to_insert was not wait flush_delay ($end_seconds-$start_seconds)"
fi
if (( (end_seconds-start_seconds)>=(max_delay_to_insert-1) )); then
echo "max_delay_to_insert was overcommited ($end_seconds-$start_seconds)"
fi
${CLICKHOUSE_CLIENT} -nq "
drop table dist_01675;
drop table data_01675;
"

View File

@ -200,6 +200,7 @@
01659_test_base64Decode_mysql_compatibility
01670_distributed_bytes_to_throw_insert
01674_htm_xml_coarse_parse
01675_distributed_bytes_to_delay_insert
01675_data_type_coroutine
01676_clickhouse_client_autocomplete
01671_aggregate_function_group_bitmap_data