Right now SYSTEM FLUSH DISTRIBUTED will block:
- INSERT into this Distributed table (requireDirectoryMonitor())
- SELECT * FROM system.distribution_queue
* add the query data deduplication excluding duplicated parts in MergeTree family engines.
query deduplication is based on parts' UUID which should be enabled first with merge_tree setting
assign_part_uuids=1
allow_experimental_query_deduplication setting is to enable part deduplication, default ot false.
data part UUID is a mechanism of giving a data part a unique identifier.
Having UUID and deduplication mechanism provides a potential of moving parts
between shards preserving data consistency on a read path:
duplicated UUIDs will cause root executor to retry query against on of the replica explicitly
asking to exclude encountered duplicated fingerprints during a distributed query execution.
NOTE: this implementation don't provide any knobs to lock part and hence its UUID. Any mutations/merge will
update part's UUID.
* add _part_uuid virtual column, allowing to use UUIDs in predicates.
Signed-off-by: Aleksei Semiglazov <asemiglazov@cloudflare.com>
address comments
Two new settings (by analogy with MergeTree family) has been added:
- `fsync_after_insert` - Do fsync for every inserted. Will decreases
performance of inserts.
- `fsync_tmp_directory` - Do fsync for temporary directory (that is used
for async INSERT only) after all part operations (writes, renames,
etc.).
Refs: #17380 (p1)
* Distributed insertion to one random shard
* add some tests
* add some documentation
* Respect shards' weights
* fine locking
Co-authored-by: Ivan Lezhankin <ilezhankin@yandex-team.ru>
<details>
```
drop() on T1275:
0 DB::StorageDistributed::drop (this=0x7f9ed34f0000) at ../contrib/libcxx/include/__hash_table:966
1 0x000000000d557242 in DB::DatabaseOnDisk::dropTable (this=0x7f9fc22706d8, context=..., table_name=...)
at ../contrib/libcxx/include/new:340
2 0x000000000d6fcf7c in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f9e42560dc0, query=...)
at ../contrib/libcxx/include/memory:3826
3 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f9e42560dc0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
4 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
5 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
6 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
7 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
8 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
9 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
10 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
11 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
12 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
13 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
StorageDistributedDirectoryMonitor on T166:
0 DB::StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor (this=0x7f9ea7ab1400, storage_=..., path_=...,
pool_=..., monitor_blocker_=..., bg_pool_=...) at ../src/Storages/Distributed/DirectoryMonitor.cpp:81
1 0x000000000dbf684e in std::__1::make_unique<> () at ../contrib/libcxx/include/memory:3474
2 DB::StorageDistributed::requireDirectoryMonitor (this=0x7f9ed34f0000, disk=..., name=...)
at ../src/Storages/StorageDistributed.cpp:682
3 0x000000000de3d5fa in DB::DistributedBlockOutputStream::writeToShard (this=this@entry=0x7f9ed39c7418, block=..., dir_names=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:634
4 0x000000000de3e214 in DB::DistributedBlockOutputStream::writeAsyncImpl (this=this@entry=0x7f9ed39c7418, block=...,
shard_id=shard_id@entry=79) at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:539
5 0x000000000de3e47b in DB::DistributedBlockOutputStream::writeSplitAsync (this=this@entry=0x7f9ed39c7418, block=...)
at ../contrib/libcxx/include/vector:1546
6 0x000000000de3eab0 in DB::DistributedBlockOutputStream::writeAsync (block=..., this=0x7f9ed39c7418)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:141
7 DB::DistributedBlockOutputStream::write (this=0x7f9ed39c7418, block=...)
at ../src/Storages/Distributed/DistributedBlockOutputStream.cpp:135
8 0x000000000d73b376 in DB::PushingToViewsBlockOutputStream::write (this=this@entry=0x7f9ea7a8cf58, block=...)
at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:157
9 0x000000000d7853eb in DB::AddingDefaultBlockOutputStream::write (this=0x7f9ed383d118, block=...)
at ../contrib/libcxx/include/memory:3826
10 0x000000000d740790 in DB::SquashingBlockOutputStream::write (this=0x7f9ed383de18, block=...)
at ../contrib/libcxx/include/memory:3826
11 0x000000000d68c308 in DB::CountingBlockOutputStream::write (this=0x7f9ea7ac6d60, block=...)
at ../contrib/libcxx/include/memory:3826
12 0x000000000ddab449 in DB::StorageBuffer::writeBlockToDestination (this=this@entry=0x7f9fbd56a000, block=..., table=...)
at ../src/Storages/StorageBuffer.cpp:747
13 0x000000000ddabfa6 in DB::StorageBuffer::flushBuffer (this=this@entry=0x7f9fbd56a000, buffer=...,
check_thresholds=check_thresholds@entry=true, locked=locked@entry=false, reset_block_structure=reset_block_structure@entry=false)
at ../src/Storages/StorageBuffer.cpp:661
14 0x000000000ddac415 in DB::StorageBuffer::flushAllBuffers (reset_blocks_structure=false, check_thresholds=true, this=0x7f9fbd56a000)
at ../src/Storages/StorageBuffer.cpp:605
shutdown() on T1275:
0 DB::StorageDistributed::shutdown (this=0x7f9ed34f0000) at ../contrib/libcxx/include/atomic:1612
1 0x000000000d6fd938 in DB::InterpreterDropQuery::executeToTable (this=this@entry=0x7f98530c79a0, query=...)
at ../src/Storages/TableLockHolder.h:12
2 0x000000000d6ff5ee in DB::InterpreterDropQuery::execute (this=0x7f98530c79a0) at ../src/Interpreters/InterpreterDropQuery.cpp:50
3 0x000000000daa40c0 in DB::executeQueryImpl (begin=<optimized out>, end=<optimized out>, context=..., internal=<optimized out>,
stage=DB::QueryProcessingStage::Complete, has_query_tail=false, istr=0x0) at ../src/Interpreters/executeQuery.cpp:420
4 0x000000000daa59df in DB::executeQuery (query=..., context=..., internal=internal@entry=false, stage=<optimized out>,
may_have_embedded_data=<optimized out>) at ../contrib/libcxx/include/string:1487
5 0x000000000e1369e6 in DB::TCPHandler::runImpl (this=this@entry=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:254
6 0x000000000e1379c9 in DB::TCPHandler::run (this=0x7f9ddf3a9000) at ../src/Server/TCPHandler.cpp:1326
7 0x000000001086fac7 in Poco::Net::TCPServerConnection::start (this=this@entry=0x7f9ddf3a9000)
at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
8 0x000000001086ff2b in Poco::Net::TCPServerDispatcher::run (this=0x7f9e4eba5c00)
at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:114
9 0x00000000109dbe8e in Poco::PooledThread::run (this=0x7f9e4a2d2f80) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
10 0x00000000109d78f9 in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>)
at ../contrib/poco/Foundation/include/Poco/SharedPtr.h:401
11 0x00007f9fc3cccea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
12 0x00007f9fc3bebeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
```
</details>
* Use only |name_parts| as primary name source
* Restore legacy logic for table restoration
* Fix build
* Fix tests
* Add pytest server config
* Fix tests
* Fixes due to review
Possible values:
- 1 - Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.
- 2 - same as 1 but also apply ORDER BY and LIMIT stages
Previous set of QueryProcessingStage does not allow to do this.
But after WithMergeableStateAfterAggregation had been introduced the
following queries can be optimized too under
optimize_distributed_group_by_sharding_key:
- GROUP BY sharding_key LIMIT
- GROUP BY sharding_key LIMIT BY
- GROUP BY sharding_key ORDER BY
And right now it is still not supports:
- WITH TOTALS (looks like it can be supported)
- WITH ROLLUP (looks like it can be supported)
- WITH CUBE
- SETTINGS extremes=1 (looks like it can be supported)
But will be implemented separatelly.
vX: fixes
v2: fix WITH *
v3: fix extremes
v4: fix LIMIT OFFSET (and make a little bit cleaner)
v5: fix HAVING
v6: fix ORDER BY
v7: rebase against 20.7
v8: move out WithMergeableStateAfterAggregation
v9: add optimize_distributed_group_by_sharding_key into test names
Example of such functions is rand()
And this patch disables only optimize_skip_unused_shards, i.e. INSERT
code path does not changed, so it will work as before.
1. Moved Volume to separate file
2. Created IVolume interface and implemented current behaviour in implementation of new interface — VolumeJBOD
3. Replaced all old volume usages with new VolumeJBOD. Where it is unnecessary to have JBOD — left just IVolume.
4. Removed old Volume completely
5. Moved StoragePolicy to separated files
6. Moved DiskSelector to separated files
7. Removed DiskSpaceMonitor file
Before this patch it printed 3 times:
- from StorageDistributed::getProcessingStageImpl()
- from StorageDistributed::read()
- from StorageDistributed::getProcessingStageImpl() (from StorageDistributed::read() -> getSampleBlock())
(But this should be optimized)
I know at least one way to fool that optimization, by using as sharding
key something like `if(col1>0, col1, col2)` (although this is not common
sharding key I would say, but can be useful if this will work
correctly), so let's disable it by default.
StorageDistributed::shutdown() does not acquire the lock, that controls
access to the cluster_nodes_data, thus it does not synced with the
requireDirectoryMonitor(), hence some monitors can be untracked that
will trigger UAF (use-after-free) after DROP TABLE dist:
This is for the SIGSEGV from the DirectoryMonitor (with already destroyed storage):
0 0x0000000008e9f760 in std::__1::__cxx_atomic_load<int> (__order=std::__1::memory_order::seq_cst, __a=0x0)
1 std::__1::__atomic_base<int, false>::load (__m=std::__1::memory_order::seq_cst, this=0x0) <-- this is nullptr
2 std::__1::__atomic_base<int, false>::operator int (this=0x0)
3 DB::ActionBlocker::isCancelled (this=0x7f85e31c9bb8) at ../src/Common/ActionBlocker.h:18
4 DB::StorageDistributedDirectoryMonitor::run (this=0x7f85f93b2a00) at ../src/Storages/Distributed/DirectoryMonitor.cpp:140
After #8756 the problem with 1 thread for each (distributed table, disk)
for distributed sends became even worse (since there can be multiple
disks), so use predefined thread pool for this tasks, that can be
controlled with background_distributed_schedule_pool_size knob.
* "lock_acquire_timeout" controls for how long a query will continue to
acquire each lock on its argument tables
* "lock_acquire_timeout_for_background_operations" is a per-table
setting for storages of *MergeTree family