So now system.distribution_queue will show accurate statistics, so tests
does not requires sleep anymore.
But note that with too much distributed pending this will iterate over
all directories.
Right now with distributed_directory_monitor_batch_inserts=1 and
insert_distributed_sync=0 INSERT into Distributed table will store
blocks that should be sent to remote (and in case of
prefer_localhost_replica=0 to the localhost too) on the local
filesystem, and sent it in background.
However there is no limit for this storage, and if the remote is
unavailable (or some other error), these pending blocks may take
significant space, and this is not always desired behaviour.
Add new Distributed setting - bytes_to_throw_insert, that will set the
limit for how much pending bytes is allowed, if the limit will be
reached an exception will be throw.
By default was set to 0, to avoid surprises.
Right now SYSTEM FLUSH DISTRIBUTED will block:
- INSERT into this Distributed table (requireDirectoryMonitor())
- SELECT * FROM system.distribution_queue
* add the query data deduplication excluding duplicated parts in MergeTree family engines.
query deduplication is based on parts' UUID which should be enabled first with merge_tree setting
assign_part_uuids=1
allow_experimental_query_deduplication setting is to enable part deduplication, default ot false.
data part UUID is a mechanism of giving a data part a unique identifier.
Having UUID and deduplication mechanism provides a potential of moving parts
between shards preserving data consistency on a read path:
duplicated UUIDs will cause root executor to retry query against on of the replica explicitly
asking to exclude encountered duplicated fingerprints during a distributed query execution.
NOTE: this implementation don't provide any knobs to lock part and hence its UUID. Any mutations/merge will
update part's UUID.
* add _part_uuid virtual column, allowing to use UUIDs in predicates.
Signed-off-by: Aleksei Semiglazov <asemiglazov@cloudflare.com>
address comments
Two new settings (by analogy with MergeTree family) has been added:
- `fsync_after_insert` - Do fsync for every inserted. Will decreases
performance of inserts.
- `fsync_tmp_directory` - Do fsync for temporary directory (that is used
for async INSERT only) after all part operations (writes, renames,
etc.).
Refs: #17380 (p1)
* Distributed insertion to one random shard
* add some tests
* add some documentation
* Respect shards' weights
* fine locking
Co-authored-by: Ivan Lezhankin <ilezhankin@yandex-team.ru>
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
* Use only |name_parts| as primary name source
* Restore legacy logic for table restoration
* Fix build
* Fix tests
* Add pytest server config
* Fix tests
* Fixes due to review