Distributed: Add ability to limit amount of pending bytes for async INSERT

Right now with distributed_directory_monitor_batch_inserts=1 and
insert_distributed_sync=0 INSERT into Distributed table will store
blocks that should be sent to remote (and in case of
prefer_localhost_replica=0 to the localhost too) on the local
filesystem, and sent it in background.

However there is no limit for this storage, and if the remote is
unavailable (or some other error), these pending blocks may take
significant space, and this is not always desired behaviour.

Add new Distributed setting - bytes_to_throw_insert, that will set the
limit for how much pending bytes is allowed, if the limit will be
reached an exception will be throw.

By default was set to 0, to avoid surprises.
This commit is contained in:
Azat Khuzhin 2021-01-26 21:45:37 +03:00
parent ce09b7ff89
commit b5a5778589
9 changed files with 46 additions and 0 deletions

View File

@ -31,6 +31,13 @@ Also it accept the following settings:
- `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc). - `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc).
- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.
See also:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
!!! note "Note" !!! note "Note"
**Durability settings** (`fsync_...`): **Durability settings** (`fsync_...`):

View File

@ -540,6 +540,7 @@
M(571, DATABASE_REPLICATION_FAILED) \ M(571, DATABASE_REPLICATION_FAILED) \
M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \ M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \
M(573, EPOLL_ERROR) \ M(573, EPOLL_ERROR) \
M(574, DISTRIBUTED_TOO_MANY_PENDING_BYTES) \
\ \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \

View File

@ -628,6 +628,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// and keep monitor thread out from reading incomplete data /// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path; std::string first_file_tmp_path;
storage.throwInsertIfNeeded();
auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes()); auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes());
const auto disk = reservation->getDisk(); const auto disk = reservation->getDisk();
auto disk_path = disk->getPath(); auto disk_path = disk->getPath();

View File

@ -17,6 +17,8 @@ class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M) \ #define LIST_OF_DISTRIBUTED_SETTINGS(M) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \ M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \ M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)

View File

@ -18,6 +18,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/formatReadable.h>
#include <Parsers/ASTDropQuery.h> #include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
@ -85,6 +86,7 @@ namespace ErrorCodes
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int INVALID_SHARD_ID; extern const int INVALID_SHARD_ID;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
} }
namespace ActionLocks namespace ActionLocks
@ -975,6 +977,21 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
relative_data_path = new_path_to_table_data; relative_data_path = new_path_to_table_data;
} }
void StorageDistributed::throwInsertIfNeeded() const
{
if (!distributed_settings.bytes_to_throw_insert)
return;
/// TODO: update the counters
UInt64 total_bytes = *totalBytes(global_context.getSettingsRef());
if (total_bytes > distributed_settings.bytes_to_throw_insert)
{
throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES,
"Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})",
formatReadableSizeWithBinarySuffix(total_bytes),
formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert));
}
}
void registerStorageDistributed(StorageFactory & factory) void registerStorageDistributed(StorageFactory & factory)
{ {

View File

@ -174,6 +174,8 @@ private:
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
void throwInsertIfNeeded() const;
private: private:
String remote_database; String remote_database;
String remote_table; String remote_table;

View File

@ -0,0 +1,14 @@
drop table if exists dist_01670;
drop table if exists data_01670;
create table data_01670 (key Int) engine=Null();
create table dist_01670 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01670) settings bytes_to_throw_insert=1;
system stop distributed sends dist_01670;
-- first batch is always OK, since there is no pending bytes yet
insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0;
-- second will fail, because of bytes_to_throw_insert=1
-- (previous block definitelly takes more, since it has header)
insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 }
system flush distributed dist_01670;
drop table dist_01670;
drop table data_01670;

View File

@ -197,6 +197,7 @@
01181_db_atomic_drop_on_cluster 01181_db_atomic_drop_on_cluster
01658_test_base64Encode_mysql_compatibility 01658_test_base64Encode_mysql_compatibility
01659_test_base64Decode_mysql_compatibility 01659_test_base64Decode_mysql_compatibility
01670_distributed_bytes_to_throw_insert
01674_htm_xml_coarse_parse 01674_htm_xml_coarse_parse
01675_data_type_coroutine 01675_data_type_coroutine
01676_clickhouse_client_autocomplete 01676_clickhouse_client_autocomplete