After #11599 it is possible that messages of the
ReadBufferFromKafkaConsumer will be cleaned-up right in
read_kafka_message callback (from KafkaBlockInputStream) if the stop
flag isset (i.e. DROP TABLE is waiting the consumer), and if
read_kafka_message already processed some rows it will not return 0 and
the loop after will try to get current topic from the buffer, which uses
messages in the underlying and this will got SIGSEGV:
12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue
12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup
12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11
12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ########################################
12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11)
12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object.
12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace:
12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse
12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse
12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse
55421 thread (shows that it still waiting for deactivation):
5 std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90
6 DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59
7 DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821
And just in case thread where read_kafka_message is called:
0 DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491
1 DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59
2 DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81
3 DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945
4 DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222
5 DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218
6 DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64
7 DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48
8 DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826
9 DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340
Cc: @filimonov
Add formats tests, fixes for JSONCompactEachRowWithNamesAndTypes, TSVWithNamesAndTypes. Some CR fixes
Add sanitizing for kafka_max_block_size and kafka_poll_max_batch_size
* Added flag for safer rdkafka destruction, but more testing detected another hang case, which has no straigt solutions and can be workarounded currenly only by draining the consumer queue, so destructor is back
* After review fixes
* After review fixes2
* After review fixes3
First of all it is nice to have part of the table name in it.
And secondly, librdkafka uses pthread_setname_np(), but due to
glibc-compatibility it is a noop in clickhouse sources:
# librdkafka uses rdk: prefix for thread names by default
$ grep rdk /proc/$(pgrep clickhouse-serv)/task/*/comm
# just in case:
$ grep rdk /proc/*/task/*/comm
$ grep rdk /proc/*/comm
(gdb) disas pthread_setname_np
Dump of assembler code for function pthread_setname_np:
0x000000000c603250 <+0>: xor %eax,%eax
0x000000000c603252 <+2>: retq
End of assembler dump.