Fix the hang on dropping Kafka table when there is no mat. views

This commit is contained in:
Ivan Lezhankin 2019-05-27 20:25:34 +03:00
parent d2fd7a449f
commit 13212c9b01
2 changed files with 11 additions and 2 deletions

View File

@ -3,6 +3,16 @@
namespace DB
{
using namespace std::chrono_literals;
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
{
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
consumer->unsubscribe();
consumer->unassign();
while(consumer->get_consumer_queue().next_event(1s));
}
void ReadBufferFromKafkaConsumer::commit()
{
if (messages.empty() || current == messages.begin())
@ -20,8 +30,6 @@ void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;
consumer->pause(); // don't accidentally read any messages
consumer->subscribe(topics);
consumer->poll(5s);

View File

@ -27,6 +27,7 @@ public:
, current(messages.begin())
{
}
~ReadBufferFromKafkaConsumer() override;
void commit(); // Commit all processed messages.
void subscribe(const Names & topics); // Subscribe internal consumer to topics.