Create new consumers on-demand

and give them some time for graceful assignment.
This commit is contained in:
Ivan Lezhankin 2019-01-30 20:41:06 +03:00
parent 681266abd0
commit 39e2563f93
3 changed files with 38 additions and 60 deletions

View File

@ -1,92 +1,69 @@
#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
} // namespace ErrorCodes
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_skip_unknown_fields", 1u);
// We don't use ratio since the number of Kafka messages may vary from stream to stream.
// Thus, ratio is meaningless.
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);
if (schema.size() > 0)
if (!schema.empty())
context.setSetting("format_schema", schema);
}
KafkaBlockInputStream::~KafkaBlockInputStream()
{
if (!hasClaimed())
if (!claimed)
return;
// An error was thrown during the stream or it did not finish successfully
// The read offsets weren't comitted, so consumer must rejoin the group from the original starting point
if (!finalized)
if (broken)
{
LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining");
LOG_TRACE(storage.log, "Re-joining claimed consumer after failure");
consumer->unsubscribe();
consumer->subscribe(storage.topics);
}
// Return consumer for another reader
storage.pushConsumer(consumer);
}
String KafkaBlockInputStream::getName() const
{
return storage.getName();
}
Block KafkaBlockInputStream::readImpl()
{
if (!hasClaimed())
return {};
return children.back()->read();
}
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlock();
}
void KafkaBlockInputStream::readPrefixImpl()
{
if (!hasClaimed())
{
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
if (consumer == nullptr)
throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED);
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
claimed = !!consumer;
buffer = std::make_unique<DelimitedReadBuffer>(new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter);
addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size));
if (!consumer)
consumer = std::make_shared<cppkafka::Consumer>(storage.createConsumerConfiguration());
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;
consumer->pause(); // don't accidentally read any messages
consumer->subscribe(storage.topics);
consumer->poll(5s);
consumer->resume();
}
// Start reading data
finalized = false;
buffer = std::make_unique<DelimitedReadBuffer>(
new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter);
addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size));
broken = true;
}
void KafkaBlockInputStream::readSuffixImpl()
{
if (hasClaimed())
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
// Mark as successfully finished
finalized = true;
broken = false;
}
} // namespace DB

View File

@ -3,11 +3,11 @@
#include <DataStreams/IBlockInputStream.h>
#include <IO/DelimitedReadBuffer.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class StorageKafka;
class KafkaBlockInputStream : public IBlockInputStream
{
@ -15,9 +15,10 @@ public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_);
~KafkaBlockInputStream() override;
String getName() const override;
Block readImpl() override;
Block getHeader() const override;
String getName() const override { return storage.getName(); }
Block readImpl() override { return children.back()->read(); }
Block getHeader() const override { return storage.getSampleBlock(); }
void readPrefixImpl() override;
void readSuffixImpl() override;
@ -28,10 +29,7 @@ private:
ConsumerPtr consumer;
std::unique_ptr<DelimitedReadBuffer> buffer;
bool finalized = false;
// Return true if consumer has been claimed by the stream
bool hasClaimed() { return consumer != nullptr; }
bool broken = true, claimed = false;
};
} // namespace DB

View File

@ -177,6 +177,9 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration()
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");