Before this patch (and after #22208) the INSERT may fail with "Cannot
schedule a task" because the pool in DistributedBlockOutputStream
already throws exception and simply fail in writeSuffix().
INSERT into Distributed with insert_distributed_sync=1 stores the
distributed batches on the disk for sending in background.
But types may be a little bit different for the Distributed and it's
underlying table, so the initiator need to know whether conversion is
required or not.
Before this patch those on disk distributed batches contains header,
which includes dumpStructure() for the block in that batch, however it
checks not only names and types and plus dumpStructure() is a debug
method.
So instead of storing string representation for the block header we
should store empty block in the file header (note, that we cannot store
the empty block not in header, since this will require reading all
blocks from file, due to some trickery of the readers interface).
Note, that this patch also contains tiny refactoring:
- s/header/distributed_header/
v1: dumpNamesAndTypes()
v2: dump empty block into the batch itself
v3: move empty block into the header
Add two new settings for the Distributed engine:
- bytes_to_delay_insert
- max_delay_to_insert
If at the beginning of INSERT there will be too much pending data, more
then bytes_to_delay_insert, then the INSERT will wait until it will be
shrinked, and not more then max_delay_to_insert seconds.
If after this there will be still too much pending, it will throw an
exception.
Also new profile events were added (by analogy to the MergeTree):
- DistributedDelayedInserts (although you can use system.errors instead
of this, but still)
- DistributedRejectedInserts
- DistributedDelayedInsertsMilliseconds
Previous patch fixes the inaccuracy, but it's done using iterating over
directory on each request (to system.distribution_queue or to check
bytes_to_throw_insert), and like previous patch alredy stated, it may
have pretty huge overhead (especially when you have lots of distributed
files pending).
This patch remove that recalculation (but it will still be done, and
if there is different, there will be a log message), and replace it with
proper account at INSERT time (and after file has been sent, or marked
as broken).
So now system.distribution_queue will show accurate statistics, so tests
does not requires sleep anymore.
But note that with too much distributed pending this will iterate over
all directories.
Right now with distributed_directory_monitor_batch_inserts=1 and
insert_distributed_sync=0 INSERT into Distributed table will store
blocks that should be sent to remote (and in case of
prefer_localhost_replica=0 to the localhost too) on the local
filesystem, and sent it in background.
However there is no limit for this storage, and if the remote is
unavailable (or some other error), these pending blocks may take
significant space, and this is not always desired behaviour.
Add new Distributed setting - bytes_to_throw_insert, that will set the
limit for how much pending bytes is allowed, if the limit will be
reached an exception will be throw.
By default was set to 0, to avoid surprises.
It is possible to get corruption (even though it is very unlikely, and
initially it wasn't corruption) just before the data block goes in the
file on disk, and in case of batching, it will break the packets, since
it will write the packet type but will not write any data after.
- the sender will got ATTEMPT_TO_READ_AFTER_EOF (added in
946c275dfb) when the client just go
away, i.e. server had been restarted, and this is incorrect to mark the
file as broken in this case.
- since #18853 the file will be checked on the sender locally, and
in case the file was truncated CANNOT_READ_ALL_DATA will be thrown.
But before #18853 the sender will not receive
ATTEMPT_TO_READ_AFTER_EOF from the client in case of file was truncated
on the sender, since the client will just wait for more data, IOW just hang.
- and I don't see how ATTEMPT_TO_READ_AFTER_EOF can be received while
reading local file.
Before this patch the DirectoryMonitor was checking the compressed file
by reading it one more time (since w/o this receiver may stuck on
truncated file), while this is ineffective and we can just check the
checksums before sending.
But note that this may decrease batch size that is used for sending over
network.
Before this patch StorageDistributedDirectoryMonitor reading .bin files
in batch mode, just to calculate number of bytes/rows, this is very
ineffective, let's just store them in the header (rows/bytes).
This is already done for distributed_directory_monitor_batch_inserts=1,
so let's do the same for the non batched mode, since otherwise in case
the file will be truncated the receiver will just stuck (since it will
wait for the block, but the sender will not send it).
Two new settings (by analogy with MergeTree family) has been added:
- `fsync_after_insert` - Do fsync for every inserted. Will decreases
performance of inserts.
- `fsync_tmp_directory` - Do fsync for temporary directory (that is used
for async INSERT only) after all part operations (writes, renames,
etc.).
Refs: #17380 (p1)
* Distributed insertion to one random shard
* add some tests
* add some documentation
* Respect shards' weights
* fine locking
Co-authored-by: Ivan Lezhankin <ilezhankin@yandex-team.ru>