2019-01-21 14:02:03 +00:00
|
|
|
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2019-04-22 13:23:05 +00:00
|
|
|
|
2019-05-27 17:25:34 +00:00
|
|
|
using namespace std::chrono_literals;
|
|
|
|
|
|
|
|
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
|
|
|
|
{
|
|
|
|
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
|
|
|
|
consumer->unsubscribe();
|
|
|
|
consumer->unassign();
|
2019-06-03 14:36:59 +00:00
|
|
|
while (consumer->get_consumer_queue().next_event(1s));
|
2019-05-27 17:25:34 +00:00
|
|
|
}
|
|
|
|
|
2019-01-25 12:48:59 +00:00
|
|
|
void ReadBufferFromKafkaConsumer::commit()
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-06-17 16:27:18 +00:00
|
|
|
if (current != messages.end())
|
|
|
|
{
|
|
|
|
/// Since we can poll more messages than we already processed,
|
|
|
|
/// commit only processed messages.
|
|
|
|
consumer->async_commit(*current);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Commit everything we polled so far because either:
|
|
|
|
/// - read all polled messages (current == messages.end()),
|
|
|
|
/// - read nothing at all (messages.empty()),
|
|
|
|
/// - stalled.
|
|
|
|
consumer->async_commit();
|
|
|
|
}
|
2019-04-22 13:23:05 +00:00
|
|
|
|
2019-06-17 16:27:18 +00:00
|
|
|
const auto & offsets = consumer->get_offsets_committed(consumer->get_assignment());
|
|
|
|
for (const auto & topic_part : offsets)
|
|
|
|
{
|
|
|
|
LOG_TRACE(
|
|
|
|
log,
|
|
|
|
"Committed offset " << topic_part.get_offset() << " (topic: " << topic_part.get_topic()
|
|
|
|
<< ", partition: " << topic_part.get_partition() << ")");
|
|
|
|
}
|
2019-01-25 12:48:59 +00:00
|
|
|
}
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
|
|
|
|
{
|
|
|
|
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
|
|
|
|
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
|
|
|
|
if (consumer->get_subscription().empty())
|
|
|
|
{
|
|
|
|
consumer->pause(); // don't accidentally read any messages
|
|
|
|
consumer->subscribe(topics);
|
|
|
|
consumer->poll(5s);
|
|
|
|
consumer->resume();
|
|
|
|
}
|
2019-05-15 16:11:50 +00:00
|
|
|
|
|
|
|
stalled = false;
|
2019-04-22 13:23:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void ReadBufferFromKafkaConsumer::unsubscribe()
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Re-joining claimed consumer after failure");
|
|
|
|
consumer->unsubscribe();
|
|
|
|
}
|
|
|
|
|
2019-06-17 16:27:18 +00:00
|
|
|
/// Try to commit messages implicitly after we processed the previous batch.
|
2019-01-25 12:48:59 +00:00
|
|
|
bool ReadBufferFromKafkaConsumer::nextImpl()
|
|
|
|
{
|
2019-05-16 15:20:30 +00:00
|
|
|
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
|
2019-05-15 16:11:50 +00:00
|
|
|
/// If we failed to poll any message once - don't try again.
|
|
|
|
/// Otherwise, the |poll_timeout| expectations get flawn.
|
|
|
|
if (stalled)
|
|
|
|
return false;
|
|
|
|
|
2019-01-25 12:48:59 +00:00
|
|
|
if (current == messages.end())
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-05-16 15:20:30 +00:00
|
|
|
if (intermediate_commit)
|
|
|
|
commit();
|
2019-06-21 14:29:10 +00:00
|
|
|
|
|
|
|
/// Don't drop old messages immediately, since we may need them for virtual columns.
|
|
|
|
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
|
|
|
|
if (new_messages.empty())
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Stalled");
|
|
|
|
stalled = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
messages = std::move(new_messages);
|
2019-01-25 12:48:59 +00:00
|
|
|
current = messages.begin();
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
2019-01-25 12:48:59 +00:00
|
|
|
|
|
|
|
if (auto err = current->get_error())
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-01-25 12:48:59 +00:00
|
|
|
++current;
|
|
|
|
|
|
|
|
// TODO: should throw exception instead
|
2019-01-21 14:02:03 +00:00
|
|
|
LOG_ERROR(log, "Consumer error: " << err);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// XXX: very fishy place with const casting.
|
2019-01-25 12:48:59 +00:00
|
|
|
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
|
|
|
|
BufferBase::set(new_position, current->get_payload().get_size(), 0);
|
|
|
|
|
|
|
|
++current;
|
2019-01-23 11:00:43 +00:00
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
}
|