Recently I saw the following, the client executed long distributed query
and terminated the connection, and in this case query cancellation will
be done from PullingAsyncPipelineExecutor dtor, but during cancellation
one of nodes sent ECONNRESET, and this leads to an exception from
PullingAsyncPipelineExecutor::cancel(), and this leads to a deadlock
when multiple threads waits each others, because cancel() for
LazyOutputFormat wasn't called.
Here is as relevant portion of logs:
2023.01.04 08:26:09.236208 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Debug> executeQuery: (from 10.61.13.253:44266, user: default) TooLongDistributedQueryToPost
...
2023.01.04 08:26:09.262424 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Trace> MergeTreeInOrderSelectProcessor: Reading 1 ranges in order from part 9_330_538_18, approx. 61440 rows starting from 0
2023.01.04 08:26:09.266399 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Trace> Connection (s4.ch:9000): Connecting. Database: (not specified). User: default
2023.01.04 08:26:09.266849 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Trace> Connection (s4.ch:9000): Connected to ClickHouse server version 22.10.1.
2023.01.04 08:26:09.267165 [ 26788 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Debug> Connection (s4.ch:9000): Sent data for 2 scalars, total 2 rows in 3.1587e-05 sec., 62635 rows/sec., 68.00 B (2.03 MiB/sec.), compressed 0.4594594594594595 times to 148.00 B (4.41 MiB/sec.)
2023.01.04 08:39:13.047170 [ 37968 ] {f2ed6149-146d-4a3d-874a-b0b751c7b567} <Error> PullingAsyncPipelineExecutor: Code: 210. DB::NetException: Connection reset by peer, while writing to socket (10.7.142.115:9000). (NETWORK_ERROR), Stack trace (when copying this message, always include the lines below):
0. ./.build/./contrib/libcxx/include/exception:133: Poco::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) @ 0x1818234c in /usr/lib/debug/usr/bin/clickhouse.debug
1. ./.build/./src/Common/Exception.cpp:69: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x1004fbda in /usr/lib/debug/usr/bin/clickhouse.debug
2. ./.build/./src/Common/NetException.h:12: DB::WriteBufferFromPocoSocket::nextImpl() @ 0x14e352f3 in /usr/lib/debug/usr/bin/clickhouse.debug
3. ./.build/./src/IO/BufferBase.h:39: DB::Connection::sendCancel() @ 0x15c21e6b in /usr/lib/debug/usr/bin/clickhouse.debug
4. ./.build/./src/Client/MultiplexedConnections.cpp:0: DB::MultiplexedConnections::sendCancel() @ 0x15c4d5b7 in /usr/lib/debug/usr/bin/clickhouse.debug
5. ./.build/./src/QueryPipeline/RemoteQueryExecutor.cpp:627: DB::RemoteQueryExecutor::tryCancel(char const*, std::__1::unique_ptr<DB::RemoteQueryExecutorReadContext, std::__1::default_delete<DB::RemoteQueryExecutorReadContext> >*) @ 0x14446c09 in /usr/lib/debug/usr/bin/clickhouse.debug
6. ./.build/./contrib/libcxx/include/__iterator/wrap_iter.h💯 DB::ExecutingGraph::cancel() @ 0x15d2c0de in /usr/lib/debug/usr/bin/clickhouse.debug
7. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:300: DB::PullingAsyncPipelineExecutor::cancel() @ 0x15d32055 in /usr/lib/debug/usr/bin/clickhouse.debug
8. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:312: DB::PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor() @ 0x15d31f4f in /usr/lib/debug/usr/bin/clickhouse.debug
9. ./.build/./src/Server/TCPHandler.cpp:0: DB::TCPHandler::processOrdinaryQueryWithProcessors() @ 0x15cde919 in /usr/lib/debug/usr/bin/clickhouse.debug
10. ./.build/./src/Server/TCPHandler.cpp:0: DB::TCPHandler::runImpl() @ 0x15cd8554 in /usr/lib/debug/usr/bin/clickhouse.debug
11. ./.build/./src/Server/TCPHandler.cpp:1904: DB::TCPHandler::run() @ 0x15ce6479 in /usr/lib/debug/usr/bin/clickhouse.debug
12. ./.build/./contrib/poco/Net/src/TCPServerConnection.cpp:57: Poco::Net::TCPServerConnection::start() @ 0x18074f07 in /usr/lib/debug/usr/bin/clickhouse.debug
13. ./.build/./contrib/libcxx/include/__memory/unique_ptr.h:54: Poco::Net::TCPServerDispatcher::run() @ 0x180753ed in /usr/lib/debug/usr/bin/clickhouse.debug
14. ./.build/./contrib/poco/Foundation/src/ThreadPool.cpp:213: Poco::PooledThread::run() @ 0x181e3807 in /usr/lib/debug/usr/bin/clickhouse.debug
15. ./.build/./contrib/poco/Foundation/include/Poco/SharedPtr.h:156: Poco::ThreadImpl::runnableEntry(void*) @ 0x181e1483 in /usr/lib/debug/usr/bin/clickhouse.debug
16. ? @ 0x7ffff7e55fd4 in ?
17. ? @ 0x7ffff7ed666c in ?
(version 22.10.1.1)
And here is the state of the threads:
<details>
<summary>system.stack_trace</summary>
```sql
SELECT
arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') AS sym
FROM system.stack_trace
WHERE query_id = 'f2ed6149-146d-4a3d-874a-b0b751c7b567'
SETTINGS allow_introspection_functions=1
Row 1:
──────
sym:
pthread_cond_wait
std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&)
bool ConcurrentBoundedQueue<DB::Chunk>::emplaceImpl<DB::Chunk>(std::__1::optional<unsigned long>, DB::Chunk&&)
DB::IOutputFormat::work()
DB::ExecutionThreadContext::executeTask()
DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic<bool>*)
Row 2:
──────
sym:
pthread_cond_wait
Poco::EventImpl::waitImpl()
DB::PipelineExecutor::joinThreads()
DB::PipelineExecutor::executeImpl(unsigned long)
DB::PipelineExecutor::execute(unsigned long)
Row 3:
──────
sym:
pthread_cond_wait
Poco::EventImpl::waitImpl()
DB::PullingAsyncPipelineExecutor::Data::~Data()
DB::PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
DB::TCPHandler::processOrdinaryQueryWithProcessors()
DB::TCPHandler::runImpl()
DB::TCPHandler::run()
Poco::Net::TCPServerConnection::start()
Poco::Net::TCPServerDispatcher::run()
Poco::PooledThread::run()
Poco::ThreadImpl::runnableEntry(void*)
```
</details>
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Reasons:
1. The original Gorilla paper proposed a compression schema for pairs of
time stamps and double-precision FP values. ClickHouse's Gorilla
codec only implements compression of the latter and it does not
impose any data type restrictions.
- Data types != Float* or (U)Int* (e.g. Decimal, Point etc.) are
definitely not supposed to be used with Gorilla.
- (U)Int* types are debatable. The paper only considers
integers-stored-as-FP-values, a practical use case for which
Gorilla works well. Standalone integers are not considered which
makes them at least suspicious.
2. Achieve consistency with FPC, another specialized floating-point
timeseries codec, which rejects non-float data.
3. On practical datasets, ZSTD is often "good enough" (**) so it should
be okay to disincentive non-ZSTD codecs a little bit. If needed,
Delta and DoubleDelta codecs are viable alternative for slowly
changing (time-series-like) integer sequences.
Since on-prem and hosted users may still have Gorilla-compressed
non-float data, this combination is only deprecated for now. No warning
or error will be emitted. Users are encouraged to migrate
Gorilla-compressed non-float data to an alternative codec. It is planned
to treat Gorilla-compressed non-float columns as "suspicious" six months
after this commit (i.e. in v23.6). Even then, it will still be possible
to set "allow_suspicious_codecs = true" and read and write
Gorilla-compressed non-float data.
(*) Sec. 4.1.2, "Gorilla restricts the value element in its tuple to a
double floating point type.", https://doi.org/10.14778/2824032.2824078
(**) https://clickhouse.com/blog/optimize-clickhouse-codecs-compression-schema
Recently I noticed that clickhouse compiled with ASan does not work with
newer glibc 2.36+, before I though that this was only about compiling
with old but using new, however that was not correct, ASan simply does
not work with glibc 2.36+.
Here is a simple reproducer [1]:
$ cat > test-asan.cpp <<EOL
#include <pthread.h>
int main()
{
// something broken in ASan in interceptor for __pthread_mutex_lock
// and only since glibc 2.36, and for pthread_mutex_lock everything is OK
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
return __pthread_mutex_lock(&mutex);
}
EOL
$ clang -g3 -o test-asan test-asan.cpp -fsanitize=address
$ ./test-asan
AddressSanitizer:DEADLYSIGNAL
=================================================================
==15659==ERROR: AddressSanitizer: SEGV on unknown address 0x000000000000 (pc 0x000000000000 bp 0x7fffffffccb0 sp 0x7fffffffcb98 T0)
==15659==Hint: pc points to the zero page.
==15659==The signal is caused by a READ memory access.
==15659==Hint: address points to the zero page.
#0 0x0 (<unknown module>)
#1 0x7ffff7cda28f (/usr/lib/libc.so.6+0x2328f) (BuildId: 1e94beb079e278ac4f2c8bce1f53091548ea1584)
AddressSanitizer can not provide additional info.
SUMMARY: AddressSanitizer: SEGV (<unknown module>)
==15659==ABORTING
[1]: https://gist.github.com/azat/af073e57a248e04488b21068643f079e
I've started observing glibc code, there was some changes in glibc, that
moves pthread functions out from libpthread.so.0 into libc.so.6
(somewhere between 2.31 and 2.35), but
the problem pops up only with 2.36, 2.35 works fine.
After this I've looked into changes between 2.35 and 2.36, and found
this patch [2] - "dlsym: Make RTLD_NEXT prefer default version
definition [BZ #14932]", that fixes this bug [3].
[2]: https://sourceware.org/git/?p=glibc.git;a=commit;h=efa7936e4c91b1c260d03614bb26858fbb8a0204
[3]: https://sourceware.org/bugzilla/show_bug.cgi?id=14932
The problem with using DL_LOOKUP_RETURN_NEWEST flag for RTLD_NEXT is
that it does not resolve hidden symbols (and __pthread_mutex_lock is
indeed hidden).
Here is a sample that will show the difference [4]:
$ cat > test-dlsym.c <<EOL
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
int main()
{
void *p = dlsym(RTLD_NEXT, "__pthread_mutex_lock");
printf("__pthread_mutex_lock: %p (via RTLD_NEXT)\n", p);
return 0;
}
EOL
# glibc 2.35: __pthread_mutex_lock: 0x7ffff7e27f70 (via RTLD_NEXT)
# glibc 2.36: __pthread_mutex_lock: (nil) (via RTLD_NEXT)
[4]: https://gist.github.com/azat/3b5f2ae6011bef2ae86392cea7789eb7
But ThreadFuzzer uses internal symbols to wrap
pthread_mutex_lock/pthread_mutex_unlock, which are intercepted by ASan
and this leads to NULL dereference.
The fix was obvious - just use dlsym(RTLD_NEXT), however on older
glibc's this leads to endless recursion (see commits in the code). But
only for jemalloc [5], and even though sanitizers does not uses jemalloc
the code of ThreadFuzzer is generic and I don't want to guard it with
more preprocessors macros.
[5]: https://gist.github.com/azat/588d9c72c1e70fc13ebe113197883aa2
So we have to use RTLD_NEXT only for ASan.
There is also one more interesting issue, if you will compile with clang
that itself had been compiled with newer libc (i.e. 2.36), you will get
the following error:
$ podman run --privileged -v $PWD/.cmake-asan/programs:/root/bin -e PATH=/bin:/root/bin -e --rm -it ubuntu-dev-v3 clickhouse
==1==ERROR: AddressSanitizer failed to allocate 0x0 (0) bytes of SetAlternateSignalStack (error code: 22)
...
==1==End of process memory map.
AddressSanitizer: CHECK failed: sanitizer_common.cpp:53 "((0 && "unable to mmap")) != (0)" (0x0, 0x0) (tid=1)
<empty stack>
The problem is that since GLIBC_2.31, `SIGSTKSZ` is a call to
`getconf(_SC_MINSIGSTKSZ)`, but older glibc does not have it, so `-1`
will be returned and used as `SIGSTKSZ` instead.
The workaround to disable alternative stack:
$ podman run --privileged -v $PWD/.cmake-asan/programs:/root/bin -e PATH=/bin:/root/bin -e ASAN_OPTIONS=use_sigaltstack=0 --rm -it ubuntu-dev-v3 clickhouse client --version
ClickHouse client version 22.13.1.1.
Fixes: #43426
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
It does not give significant benefit, but now, you hashed/sparse_hashed
dictionaries can be filled in parallel (#40003), using sharded
dictionaries, and this should be used instead of PREALLOCATE.
Note, that dictionaries, that had been created with PREALLOCATE will
work, but simply ignore this attribute.
Fixes: #41985 (cc @alexey-milovidov)
Reverts: #23979 (cc @kitaisreal)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Before this patch it was possible to have multiple directory monitors
for the same directory, one from the INSERT context, another one on
storage startup().
Here are an example of logs for this scenario:
2022.12.07 12:12:27.552485 [ 39925 ] {a47fcb32-4f44-4dbd-94fe-0070d4ea0f6b} <Debug> DDLWorker: Executed query: DETACH TABLE inc.dist_urls_in
...
2022.12.07 12:12:33.228449 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> executeQuery: (from 0.0.0.0:0, user: ) /* ddl_entry=query-0000089229 */ ATTACH TABLE inc.dist_urls_in (stage: Complete)
... this is the DirectoryMonitor created from the context of INSERT for the old StoragePtr that had not been destroyed yet (becase of "was 1" this can be done only from the context of INSERT) ...
2022.12.07 12:12:35.556048 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 1)
2022.12.07 12:12:35.556078 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 71004)
2022.12.07 12:12:35.562716 [ 39536 ] {} <Trace> Connection (i13.ch:9000): Connected to ClickHouse server version 22.10.1.
2022.12.07 12:12:35.562750 [ 39536 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes).
... this is the DirectoryMonitor that created during ATTACH ...
2022.12.07 12:12:35.802080 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Files set to 173 (was 0)
2022.12.07 12:12:35.802107 [ 39265 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Bytes set to 29750181 (was 0)
2022.12.07 12:12:35.834216 [ 39265 ] {} <Debug> inc.dist_urls_in.DirectoryMonitor: Sending a batch of 10 files to i13.ch:9000 (0.00 rows, 0.00 B bytes).
...
2022.12.07 12:12:38.532627 [ 39536 ] {} <Trace> inc.dist_urls_in.DirectoryMonitor: Sent a batch of 10 files (took 2976 ms).
...
2022.12.07 12:12:38.601051 [ 39265 ] {} <Error> inc.dist_urls_in.DirectoryMonitor: std::exception. Code: 1001, type: std::__1::__fs::filesystem::filesystem_error, e.what() = filesystem error: in file_size: No such file or directory ["/data6/clickhouse/data/inc/dist_urls_in/shard13_replica1/66827403.bin"], Stack trace (when copying this message, always include the lines below):
...
2022.12.07 12:12:54.132837 [ 4408 ] {20c761d3-a46d-417b-9fcd-89a8919dd1fe} <Debug> DDLWorker: Executed query: ATTACH TABLE inc.dist_urls_in
And eventually both monitors (for a short period of time, one replaces
another) are trying to process the same batch (current_batch.txt), and
one of them fails because such file had been already removed.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Implementation:
* Added ProfileEvents::ServerStartupTime.
* Recorded time from start of main till listening to sockets.
Testing:
* Added a test 02532_profileevents_server_startup_time.sql
Previously there was one (even though very unlikely) case when the dtor
can throw - logging code or ThreadPool::wait.
Just guard the dtor with try/catch and done with it.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Support of sharded dictionary for updatable sources is questionable
since:
- sharded dictionary developed for hashed dictionary with a huge number
of keys
- updatable source requires storing the whole table in memory (due to
how reload works)
- also it is an open question will it have some benefits from the
updatable source or not, since using updatable source with a huge
number of changes in the source does not looks optimal and on the
other side if there are small amount of changes the you don't need
sharded dictionary at all
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Before it was possible for the desturctor to throw, in case of thread
allocation fails, rewrite it to trySchedule() and do sequential destroy
in this case.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
In case of skewed distribution simple division by module will not give
you good distribution between shards and eventually this can lead to
performance the same as non-sharded dictionary (except for it will
occupy +1 thread for Block::scatter).
But if HashedDictionary::blockToAttributes() will not have calls to
HashedDictionary::getShard() this can be fixed by using a more complex
key-to-shard (getShard()) mapping. And actually you do not need to call
getShard() in blockToAttributes() you can simply use passed shard, and
that's it.
And by wrapping key with intHash64() in getShard() skewed distribution
can be fixed.
Note, that previously I tried similar approach but did not removed
getShard() from blockToAttributes(), that's why it failed.
And now it works almost as fast as with simple createBlockSelector(),
just 13.6% slower (18.75min vs 16.5min, with 16 threads).
Note, that I've also tried to add libdivide for this, but it does not
improves the performance.
I've also tried the approach without scatter, and it works 20% slower
then this one (22.5min VS 18.75min, with 16 threads).
v2: Use intHashCRC32() over intHash64() for HashedDictionary::getShard()
(with intHash64() it works very slower, almost 2x slower, there was
18min with 32 threads)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Previous patches in this series has a bottleneck in rehash(). This is
the most slowest operation when insert lots of rows into the hashtable
and eventually all that thread pool sometimes work as the most slowest
thread since we did not have any queue of blocks.
This patch adds such queue and now it scales linearly, so initialy with
1 thread I had ~4 hours for 10e9 elements (UInt64 key, UInt16 value),
after this patch it works in 16 minutes with 16 threads (well actually I
have to use 32 threads because of distribution of data in the source
table).
And now with 16 threads it works 16 times faster.
Also this patch adds more optimal block splitting for the non-complex
dictionaries, and usual block splitting for complex dictionaries.
But anyway this moves the overhead from the loading into the hashtable
threads out to the reader thread, and this is better, since reader does
not uses that much CPU.
v2: fix use-after-free on failed load (add missing wait in dtor)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Right now dictionaries (here I will talk about only
HASHED/SPARSE_HASHED/COMPLEX_KEY_HASHED/COMPLEX_KEY_SPARSE_HASHED)
can load data only in one thread, since it uses one hash table that
cannot be filled from multiple threads.
And in case you have very big dictionary (i.e. 10e9 elements), it can
take a awhile to load them, especially for SPARSE_HASHED variants (and
if you have such amount of elements there, you are likely use
SPARSE_HASHED, since it requires less memory), in my env it takes ~4
hours, which is enormous amount of time.
So this patch add support of shards for dictionaries, number of shards
determine how much hash tables will use this dictionary, also, and which
is more important, how much threads it can use to load the data.
And with 16 threads this works 2x faster, not perfect though, see the
follow up patches in this series.
v0: PARTITION BY
v1: SHARDS 1
v2: SHARDS(1)
v3: tried optimized mod - logical and, but it does not gain even 10%
v4: tried squashing more (max_block_size * shards), but it does not gain even 10% either
v5: move SHARDS into layout parameters (unknown simply ignored)
v6: tune params for perf tests (to avoid too long queries)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>