2019-01-21 14:02:03 +00:00
# include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
2019-09-04 21:25:33 +00:00
# include <common/logger_useful.h>
2019-10-22 10:31:28 +00:00
# include <cppkafka/cppkafka.h>
2020-01-30 19:30:45 +00:00
# include <boost/algorithm/string/join.hpp>
2019-10-22 10:31:28 +00:00
2019-01-21 14:02:03 +00:00
namespace DB
{
2019-04-22 13:23:05 +00:00
2020-03-26 11:52:16 +00:00
namespace ErrorCodes
{
extern const int CANNOT_COMMIT_OFFSET ;
}
2019-05-27 17:25:34 +00:00
using namespace std : : chrono_literals ;
2020-01-30 19:30:45 +00:00
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000 ;
2019-08-29 15:36:07 +00:00
2019-06-19 16:15:30 +00:00
ReadBufferFromKafkaConsumer : : ReadBufferFromKafkaConsumer (
2019-08-29 15:36:07 +00:00
ConsumerPtr consumer_ ,
Poco : : Logger * log_ ,
size_t max_batch_size ,
size_t poll_timeout_ ,
bool intermediate_commit_ ,
2020-01-30 19:30:45 +00:00
const std : : atomic < bool > & stopped_ ,
const Names & _topics )
2019-06-19 16:15:30 +00:00
: ReadBuffer ( nullptr , 0 )
, consumer ( consumer_ )
, log ( log_ )
, batch_size ( max_batch_size )
, poll_timeout ( poll_timeout_ )
, intermediate_commit ( intermediate_commit_ )
2019-08-29 15:36:07 +00:00
, stopped ( stopped_ )
2019-06-19 16:15:30 +00:00
, current ( messages . begin ( ) )
2020-01-30 19:30:45 +00:00
, topics ( _topics )
2019-06-19 16:15:30 +00:00
{
2020-01-30 19:30:45 +00:00
// called (synchroniously, during poll) when we enter the consumer group
consumer - > set_assignment_callback ( [ this ] ( const cppkafka : : TopicPartitionList & topic_partitions )
{
LOG_TRACE ( log , " Topics/partitions assigned: " < < topic_partitions ) ;
assignment = topic_partitions ;
} ) ;
// called (synchroniously, during poll) when we leave the consumer group
consumer - > set_revocation_callback ( [ this ] ( const cppkafka : : TopicPartitionList & topic_partitions )
{
// Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance
LOG_TRACE ( log , " Rebalance initiated. Revoking partitions: " < < topic_partitions ) ;
// we can not flush data to target from that point (it is pulled, not pushed)
// so the best we can now it to
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
// 2) stop / brake the current reading:
// * clean buffered non-commited messages
// * set flag / flush
messages . clear ( ) ;
current = messages . begin ( ) ;
BufferBase : : set ( nullptr , 0 , 0 ) ;
rebalance_happened = true ;
assignment . clear ( ) ;
// for now we use slower (but reliable) sync commit in main loop, so no need to repeat
// try
// {
// consumer->commit();
// }
// catch (cppkafka::HandleException & e)
// {
// LOG_WARNING(log, "Commit error: " << e.what());
// }
} ) ;
consumer - > set_rebalance_error_callback ( [ this ] ( cppkafka : : Error err )
{
LOG_ERROR ( log , " Rebalance error: " < < err ) ;
} ) ;
2019-06-19 16:15:30 +00:00
}
2019-05-27 17:25:34 +00:00
ReadBufferFromKafkaConsumer : : ~ ReadBufferFromKafkaConsumer ( )
{
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
2020-03-04 18:07:36 +00:00
try
{
2020-03-04 16:38:12 +00:00
if ( ! consumer - > get_subscription ( ) . empty ( ) )
consumer - > unsubscribe ( ) ;
2020-03-04 18:07:36 +00:00
if ( ! assignment . empty ( ) )
2020-03-04 16:38:12 +00:00
consumer - > unassign ( ) ;
while ( consumer - > get_consumer_queue ( ) . next_event ( 100 ms ) ) ;
2020-03-04 18:07:36 +00:00
}
catch ( const cppkafka : : HandleException & e )
{
2020-03-04 16:38:12 +00:00
LOG_ERROR ( log , " Exception from ReadBufferFromKafkaConsumer destructor: " < < e . what ( ) ) ;
}
2019-05-27 17:25:34 +00:00
}
2019-01-25 12:48:59 +00:00
void ReadBufferFromKafkaConsumer : : commit ( )
2019-01-21 14:02:03 +00:00
{
2020-03-23 02:12:31 +00:00
auto print_offsets = [ this ] ( const char * prefix , const cppkafka : : TopicPartitionList & offsets )
2019-08-29 15:36:07 +00:00
{
for ( const auto & topic_part : offsets )
{
auto print_special_offset = [ & topic_part ]
{
switch ( topic_part . get_offset ( ) )
{
case cppkafka : : TopicPartition : : OFFSET_BEGINNING : return " BEGINNING " ;
case cppkafka : : TopicPartition : : OFFSET_END : return " END " ;
case cppkafka : : TopicPartition : : OFFSET_STORED : return " STORED " ;
case cppkafka : : TopicPartition : : OFFSET_INVALID : return " INVALID " ;
default : return " " ;
}
} ;
if ( topic_part . get_offset ( ) < 0 )
{
LOG_TRACE (
log ,
prefix < < " " < < print_special_offset ( ) < < " (topic: " < < topic_part . get_topic ( )
< < " , partition: " < < topic_part . get_partition ( ) < < " ) " ) ;
}
else
{
LOG_TRACE (
log ,
prefix < < " " < < topic_part . get_offset ( ) < < " (topic: " < < topic_part . get_topic ( )
< < " , partition: " < < topic_part . get_partition ( ) < < " ) " ) ;
}
}
} ;
2020-03-23 02:12:31 +00:00
print_offsets ( " Polled offset " , consumer - > get_offsets_position ( consumer - > get_assignment ( ) ) ) ;
2019-08-29 15:36:07 +00:00
2020-01-30 19:30:45 +00:00
if ( hasMorePolledMessages ( ) )
{
LOG_WARNING ( log , " Logical error. Non all polled messages were processed. " ) ;
}
if ( offsets_stored > 0 )
{
// if we will do async commit here (which is faster)
// we may need to repeat commit in sync mode in revocation callback,
// but it seems like existing API doesn't allow us to to that
// in a controlled manner (i.e. we don't know the offsets to commit then)
2020-03-26 11:52:16 +00:00
size_t max_retries = 5 ;
bool commited = false ;
while ( ! commited & & max_retries > 0 )
{
try
{
// See https://github.com/edenhill/librdkafka/issues/1470
// broker may reject commit if during offsets.commit.timeout.ms (5000 by default),
// there were not enough replicas available for the __consumer_offsets topic.
// also some other temporary issues like client-server connectivity problems are possible
consumer - > commit ( ) ;
commited = true ;
print_offsets ( " Committed offset " , consumer - > get_offsets_committed ( consumer - > get_assignment ( ) ) ) ;
}
catch ( const cppkafka : : HandleException & e )
{
LOG_ERROR ( log , " Exception during commit attempt: " < < e . what ( ) ) ;
}
max_retries - - ;
}
if ( ! commited )
{
// TODO: insert atomicity / transactions is needed here (possibility to rollback, ot 2 phase commits)
throw Exception ( " All commit attempts failed. Last block was already written to target table(s) , but was not commited to Kafka . " , ErrorCodes::CANNOT_COMMIT_OFFSET) ;
}
2020-01-30 19:30:45 +00:00
}
else
{
LOG_TRACE ( log , " Nothing to commit. " ) ;
}
2019-04-22 13:23:05 +00:00
2020-01-30 19:30:45 +00:00
offsets_stored = 0 ;
2019-07-03 16:51:11 +00:00
stalled = false ;
2019-01-25 12:48:59 +00:00
}
2019-01-21 14:02:03 +00:00
2020-01-30 19:30:45 +00:00
void ReadBufferFromKafkaConsumer : : subscribe ( )
2019-04-22 13:23:05 +00:00
{
2020-01-30 19:30:45 +00:00
LOG_TRACE ( log , " Already subscribed to topics: [ "
< < boost : : algorithm : : join ( consumer - > get_subscription ( ) , " , " )
< < " ] " ) ;
2019-07-19 15:01:34 +00:00
2020-01-30 19:30:45 +00:00
LOG_TRACE ( log , " Already assigned to : " < < assignment ) ;
size_t max_retries = 5 ;
2019-07-19 15:01:34 +00:00
2019-08-07 16:10:14 +00:00
while ( consumer - > get_subscription ( ) . empty ( ) )
2019-04-22 13:23:05 +00:00
{
2020-01-30 19:30:45 +00:00
- - max_retries ;
2019-08-07 16:10:14 +00:00
try
{
consumer - > subscribe ( topics ) ;
// FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up.
// see https://github.com/edenhill/librdkafka/issues/2077
}
catch ( cppkafka : : HandleException & e )
{
2020-01-30 19:30:45 +00:00
if ( max_retries > 0 & & e . get_error ( ) = = RD_KAFKA_RESP_ERR__TIMED_OUT )
2019-08-07 16:10:14 +00:00
continue ;
throw ;
}
2019-04-22 13:23:05 +00:00
}
2019-05-15 16:11:50 +00:00
stalled = false ;
2020-01-30 19:30:45 +00:00
rebalance_happened = false ;
offsets_stored = 0 ;
2019-04-22 13:23:05 +00:00
}
void ReadBufferFromKafkaConsumer : : unsubscribe ( )
{
LOG_TRACE ( log , " Re-joining claimed consumer after failure " ) ;
2019-07-16 15:27:42 +00:00
messages . clear ( ) ;
current = messages . begin ( ) ;
BufferBase : : set ( nullptr , 0 , 0 ) ;
2020-03-04 16:38:12 +00:00
// it should not raise exception as used in destructor
2020-03-04 18:07:36 +00:00
try
{
2020-03-04 16:38:12 +00:00
if ( ! consumer - > get_subscription ( ) . empty ( ) )
consumer - > unsubscribe ( ) ;
2020-03-04 18:07:36 +00:00
}
catch ( const cppkafka : : HandleException & e )
{
2020-03-04 16:38:12 +00:00
LOG_ERROR ( log , " Exception from ReadBufferFromKafkaConsumer::unsubscribe: " < < e . what ( ) ) ;
}
2019-04-22 13:23:05 +00:00
}
2020-01-28 14:21:36 +00:00
2020-01-30 19:30:45 +00:00
bool ReadBufferFromKafkaConsumer : : hasMorePolledMessages ( ) const
{
2020-01-28 14:21:36 +00:00
return ( ! stalled ) & & ( current ! = messages . end ( ) ) ;
}
2020-01-30 19:30:45 +00:00
void ReadBufferFromKafkaConsumer : : resetToLastCommitted ( const char * msg )
{
if ( assignment . empty ( ) )
{
LOG_TRACE ( log , " Not assignned. Can't reset to last committed position. " ) ;
return ;
}
auto committed_offset = consumer - > get_offsets_committed ( consumer - > get_assignment ( ) ) ;
consumer - > assign ( committed_offset ) ;
2020-03-26 14:43:22 +00:00
LOG_TRACE ( log , msg < < " Returned to committed position: " < < committed_offset ) ;
2020-01-30 19:30:45 +00:00
}
2019-06-24 11:42:58 +00:00
/// Do commit messages implicitly after we processed the previous batch.
2019-01-25 12:48:59 +00:00
bool ReadBufferFromKafkaConsumer : : nextImpl ( )
{
2019-05-16 15:20:30 +00:00
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
2019-05-15 16:11:50 +00:00
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
2020-01-30 19:30:45 +00:00
if ( stalled | | stopped | | ! allowed | | rebalance_happened )
2019-05-15 16:11:50 +00:00
return false ;
2019-01-25 12:48:59 +00:00
if ( current = = messages . end ( ) )
2019-01-21 14:02:03 +00:00
{
2019-05-16 15:20:30 +00:00
if ( intermediate_commit )
commit ( ) ;
2019-06-21 14:29:10 +00:00
2020-01-30 19:30:45 +00:00
size_t waited_for_assignment = 0 ;
2020-03-08 21:29:00 +00:00
while ( true )
2019-06-21 14:29:10 +00:00
{
2020-01-30 19:30:45 +00:00
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer - > poll_batch ( batch_size , std : : chrono : : milliseconds ( poll_timeout ) ) ;
if ( rebalance_happened )
{
if ( ! new_messages . empty ( ) )
{
// we have polled something just after rebalance.
// we will not use current batch, so we need to return to last commited position
// otherwise we will continue polling from that position
resetToLastCommitted ( " Rewind last poll after rebalance. " ) ;
}
2019-01-25 12:48:59 +00:00
2020-01-30 19:30:45 +00:00
offsets_stored = 0 ;
return false ;
}
if ( new_messages . empty ( ) )
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if ( assignment . empty ( ) )
{
waited_for_assignment + = poll_timeout ; // slightly innaccurate, but rough calculation is ok.
if ( waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS )
{
continue ;
}
else
{
LOG_TRACE ( log , " Can't get assignment " ) ;
stalled = true ;
return false ;
}
}
else
{
LOG_TRACE ( log , " Stalled " ) ;
stalled = true ;
return false ;
}
}
else
{
messages = std : : move ( new_messages ) ;
current = messages . begin ( ) ;
LOG_TRACE ( log , " Polled batch of " < < messages . size ( ) < < " messages. Offset position: " < < consumer - > get_offsets_position ( consumer - > get_assignment ( ) ) ) ;
break ;
}
}
2019-01-21 14:02:03 +00:00
}
2019-01-25 12:48:59 +00:00
if ( auto err = current - > get_error ( ) )
2019-01-21 14:02:03 +00:00
{
2019-01-25 12:48:59 +00:00
+ + current ;
// TODO: should throw exception instead
2019-01-21 14:02:03 +00:00
LOG_ERROR ( log , " Consumer error: " < < err ) ;
return false ;
}
// XXX: very fishy place with const casting.
2019-01-25 12:48:59 +00:00
auto new_position = reinterpret_cast < char * > ( const_cast < unsigned char * > ( current - > get_payload ( ) . get_data ( ) ) ) ;
BufferBase : : set ( new_position , current - > get_payload ( ) . get_size ( ) , 0 ) ;
2019-09-20 12:12:32 +00:00
allowed = false ;
2019-01-25 12:48:59 +00:00
+ + current ;
2019-01-23 11:00:43 +00:00
2019-01-21 14:02:03 +00:00
return true ;
}
2020-01-30 19:30:45 +00:00
void ReadBufferFromKafkaConsumer : : storeLastReadMessageOffset ( )
{
if ( ! stalled & & ! rebalance_happened )
{
consumer - > store_offset ( * ( current - 1 ) ) ;
+ + offsets_stored ;
}
}
2019-04-22 13:23:05 +00:00
}