mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 05:22:17 +00:00
kafka: fix SIGSEGV on DROP TABLE
After #11599 it is possible that messages of the ReadBufferFromKafkaConsumer will be cleaned-up right in read_kafka_message callback (from KafkaBlockInputStream) if the stop flag isset (i.e. DROP TABLE is waiting the consumer), and if read_kafka_message already processed some rows it will not return 0 and the loop after will try to get current topic from the buffer, which uses messages in the underlying and this will got SIGSEGV: 12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue 12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup 12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11 12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ######################################## 12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11) 12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object. 12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace: 12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse 12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse 12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse 12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse 12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse 55421 thread (shows that it still waiting for deactivation): 5 std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90 6 DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59 7 DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821 And just in case thread where read_kafka_message is called: 0 DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491 1 DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59 2 DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81 3 DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945 4 DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222 5 DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218 6 DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64 7 DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48 8 DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826 9 DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340 Cc: @filimonov
This commit is contained in:
parent
457f56be0c
commit
6104872cae
@ -420,12 +420,9 @@ void ReadBufferFromKafkaConsumer::resetIfStopped()
|
||||
/// Do commit messages implicitly after we processed the previous batch.
|
||||
bool ReadBufferFromKafkaConsumer::nextImpl()
|
||||
{
|
||||
|
||||
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
|
||||
/// If we failed to poll any message once - don't try again.
|
||||
/// Otherwise, the |poll_timeout| expectations get flawn.
|
||||
resetIfStopped();
|
||||
|
||||
if (!allowed || !hasMorePolledMessages())
|
||||
return false;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user