ReadBufferFromKafkaConsumer does not handle the case when there is
message with an error on non first position in the current batch, since
it goes through messages in the batch after poll and stop on first valid
message.
But later it can try to use message as valid:
- while storing offset
- get topic name
- ...
And besides the message itself is also invalid (you can find this in the
gdb traces below).
So just filter out messages win an error error after poll.
SIGSEGV was with the following stacktrace:
(gdb) bt
3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656
4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225
5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508
6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150
7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60
8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21
9 DB::copyData () at ../src/DataStreams/copyData.cpp:62
10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823
11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488
And some information from it:
(gdb) p this.current.__i
$14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660
# current-1
(gdb) p $14-1
$15 = (const cppkafka::Message *) 0x7f1ca8f58600
(gdb) p $16.handle_
$17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...}
(gdb) p *(rd_kafka_message_s*)0x7f203577f938
$24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0}
# current
(gdb) p $14-0
$28 = (const cppkafka::Message *) 0x7f1ca8f58660
(gdb) p $28.handle_.__ptr_
$29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...}
(gdb) p *(rd_kafka_message_s*)0x7f184f129bf0
$30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209,
# current+1
(gdb) p (*($14+1)).handle_.__ptr_
$44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...}
(gdb) p *(rd_kafka_message_s*)0x7f184f129d30
$45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210,
_private = 0x7f184f129cc0}
# distance from the beginning
(gdb) p messages.__end_-messages.__begin_
$34 = 65536
(gdb) p ($14-0)-messages.__begin_
$37 = 8965
(gdb) p ($14-1)-messages.__begin_
$38 = 8964
# parsing info
(gdb) p allowed
$39 = false
(gdb) p new_rows
$40 = 1
(gdb) p total_rows
$41 = 8964
# current buffer is invalid
(gdb) p *buffer.__ptr_
$50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = {
begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure",
end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = {
v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but
this may lead to using of that messages after and SIGSEGV again, doh).
v2: skip messages with an error after poll.
MOTIVATION:
- remove double-conversion external dependency
- remove flatc (but flatbuffers is still required, arrow just shipped
with generated files and that's it)
CHANGED:
- remove pre-generated headers, it is shipped with the arrow
- remove flatc (see above)
NOTES (see tests changes):
- and snappy error is reported as unsupported compression.
gtest reports:
[ RUN ] GoogleTestVerification.UninstantiatedParameterizedTestSuite<CodecTestPerformance>
../src/Compression/tests/gtest_compressionCodec.cpp:590: Failure
Parameterized test suite CodecTestPerformance is defined via TEST_P, but never instantiated. None of the test cases will run. Either no INSTANTIATE_TEST_SUITE_P is provided or the only ones provided expand to nothing.
Ideally, TEST_P definitions should only ever be included as part of binaries that intend to use them. (As opposed to, for example, being placed in a library that may be linked in to get other utilities.)
To suppress this error for this test suite, insert the following line (in a non-header) in the namespace it is defined in:
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(CodecTestPerformance);
[ FAILED ] GoogleTestVerification.UninstantiatedParameterizedTestSuite<CodecTestPerformance> (0 ms)
There are still some issues with memory tracking, but now with per-user
tracking:
executeQuery: Code: 241, e.displayText() = DB::Exception: Memory limit (for user) exceeded: would use 437.72 GiB (attempt to allocate chunk of 4200926 bytes), maximum: 437.72 GiB (version 20.6.1.1) (from 10.7.140.7:31318)
Although the server is mostly idle:
SELECT formatReadableSize(memory_usage)
FROM system.processes
┌─formatReadableSize(memory_usage)─┐
│ 289.28 MiB │
│ 155.75 MiB │
│ 0.00 B │
└──────────────────────────────────┘
Refs: https://github.com/ClickHouse/ClickHouse/pull/10496/files#r450206865
Cc: @alexey-milovidov
boost::format is not compiled under gcc10:
from ../src/Compression/tests/gtest_compressionCodec.cpp:14:
/usr/include/boost/format/alt_sstream_impl.hpp: In instantiation of ‘boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type boost::io::basic_altstringbuf<Ch, Tr, Alloc>::overflow(boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type) [with Ch = char; Tr = std::char_traits<char>; Alloc = std::allocator<char>; boost::io::basic_altstringbuf<Ch, Tr, Alloc>::int_type = int]’:
/usr/include/boost/format/alt_sstream_impl.hpp:227:9: required from here
/usr/include/boost/format/alt_sstream_impl.hpp:261:45: error: no matching function for call to ‘std::allocator<char>::allocate(std::size_t&, char*)’
261 | newptr = alloc_.allocate(new_size, is_allocated_? oldptr : 0);
(although this is system-wide boost, it is pretty recent - 1.72)
This patch changes the place where the dictionary will be loaded (during
syntax analysis), but I guess this is fine, it will be loaded anyway.
Fixes: #10342
* simplify reading in order of sorting key
* add perf test for reading many parts
* Revert "simplify reading in order of sorting key"
This reverts commit 7267d7c46e.
* add threshold for preliminary merge for reading in order
* better threshold
* limit threads in test
After #11599 it is possible that messages of the
ReadBufferFromKafkaConsumer will be cleaned-up right in
read_kafka_message callback (from KafkaBlockInputStream) if the stop
flag isset (i.e. DROP TABLE is waiting the consumer), and if
read_kafka_message already processed some rows it will not return 0 and
the loop after will try to get current topic from the buffer, which uses
messages in the underlying and this will got SIGSEGV:
12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue
12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup
12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11
12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ########################################
12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11)
12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object.
12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace:
12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse
12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse
12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse
55421 thread (shows that it still waiting for deactivation):
5 std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90
6 DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59
7 DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821
And just in case thread where read_kafka_message is called:
0 DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491
1 DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59
2 DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81
3 DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945
4 DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222
5 DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218
6 DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64
7 DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48
8 DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826
9 DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340
Cc: @filimonov
Possible approach for fixing #10574
The problem is that prepared sets are built correctly, it is a hash map of key -> set
where key is a hash of AST and list of data types (when we a list of
tuples of literals).
However, when the key is built from the index to try and find if there
exists a prepared set that would match it looks for data types of the
primary key (see how data_types is populated) because the primary key
has only one field (v in my example) it can not find the prepared set.
The patch looks for any prepared indexes where data types match for the
subset of fields found in primary key, we are not interested in other
fields anyway for the purpose of primary key pruning.