Split StorageKafka.cpp on smaller files per class

This commit is contained in:
Ivan Lezhankin 2019-01-21 17:02:03 +03:00
parent a8524b6e36
commit c8e605327d
5 changed files with 266 additions and 200 deletions

View File

@ -0,0 +1,101 @@
#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
} // namespace ErrorCodes
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_skip_unknown_fields", 1u);
// We don't use ratio since the number of Kafka messages may vary from stream to stream.
// Thus, ratio is meaningless.
context.setSetting("input_format_allow_errors_ratio", 1.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);
if (schema.size() > 0)
context.setSetting("format_schema", schema);
}
KafkaBlockInputStream::~KafkaBlockInputStream()
{
if (!hasClaimed())
return;
// An error was thrown during the stream or it did not finish successfully
// The read offsets weren't comitted, so consumer must rejoin the group from the original starting point
if (!finalized)
{
LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining");
consumer->unsubscribe();
consumer->subscribe(storage.topics);
}
// Return consumer for another reader
storage.pushConsumer(consumer);
consumer = nullptr;
}
String KafkaBlockInputStream::getName() const
{
return storage.getName();
}
Block KafkaBlockInputStream::readImpl()
{
if (isCancelledOrThrowIfKilled() || !hasClaimed())
return {};
if (!reader)
throw Exception("Logical error: reader is not initialized", ErrorCodes::LOGICAL_ERROR);
return reader->read();
}
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlock();
}
void KafkaBlockInputStream::readPrefixImpl()
{
if (!hasClaimed())
{
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
if (consumer == nullptr)
throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
// Start reading data
finalized = false;
reader->readPrefix();
}
void KafkaBlockInputStream::readSuffixImpl()
{
if (hasClaimed())
{
reader->readSuffix();
// Store offsets read in this stream
read_buf->commit();
}
// Mark as successfully finished
finalized = true;
}
} // namespace DB

View File

@ -0,0 +1,37 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace DB
{
class StorageKafka;
class KafkaBlockInputStream : public IProfilingBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_);
~KafkaBlockInputStream() override;
String getName() const override;
Block readImpl() override;
Block getHeader() const override;
void readPrefixImpl() override;
void readSuffixImpl() override;
private:
StorageKafka & storage;
ConsumerPtr consumer;
Context context;
size_t max_block_size;
Block sample_block;
std::unique_ptr<ReadBufferFromKafkaConsumer> read_buf;
BlockInputStreamPtr reader;
bool finalized = false;
// Return true if consumer has been claimed by the stream
bool hasClaimed() { return consumer != nullptr; }
};
} // namespace DB

View File

@ -0,0 +1,62 @@
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace DB
{
namespace
{
/// How long to wait for a single message (applies to each individual message)
const auto READ_POLL_MS = 500;
} // namespace
bool ReadBufferFromKafkaConsumer::nextImpl()
{
if (current_pending)
{
// XXX: very fishy place with const casting.
BufferBase::set(
reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
current_pending = false;
return true;
}
// Process next buffered message
auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS));
if (!message)
return false;
if (message.is_eof())
{
// Reached EOF while reading current batch, skip it.
LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset());
return nextImpl();
}
else if (auto err = message.get_error())
{
LOG_ERROR(log, "Consumer error: " << err);
return false;
}
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
if (row_delimiter != '\0' && current)
{
BufferBase::set(&row_delimiter, 1, 0);
current = std::move(message);
current_pending = true;
return true;
}
// Consume message and mark the topic/partition offset
// The offsets will be committed in the readSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without committing offsets
current = std::move(message);
// XXX: very fishy place with const casting.
BufferBase::set(
reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
return true;
}
} // namespace DB

View File

@ -0,0 +1,44 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
#include <cppkafka/cppkafka.h>
namespace DB
{
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
public:
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_)
{
if (row_delimiter != '\0')
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
}
/// Commit messages read with this consumer
void commit()
{
LOG_TRACE(log, "Committing " << read_messages << " messages");
if (read_messages == 0)
return;
consumer->async_commit();
read_messages = 0;
}
private:
ConsumerPtr consumer;
cppkafka::Message current;
bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration.
Poco::Logger * log;
size_t read_messages = 0;
char row_delimiter;
bool nextImpl() override;
};
} // namespace DB

View File

@ -6,9 +6,6 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
@ -16,6 +13,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
@ -42,210 +40,31 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TIMEOUT_EXCEEDED;
}
using namespace Poco::Util;
/// How long to wait for a single message (applies to each individual message)
static const auto READ_POLL_MS = 500;
static const auto CLEANUP_TIMEOUT_MS = 3000;
/// Configuration prefix
static const String CONFIG_PREFIX = "kafka";
class ReadBufferFromKafkaConsumer : public ReadBuffer
namespace
{
ConsumerPtr consumer;
cppkafka::Message current;
bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration.
Poco::Logger * log;
size_t read_messages = 0;
char row_delimiter;
const auto RESCHEDULE_MS = 500;
const auto CLEANUP_TIMEOUT_MS = 3000;
bool nextImpl() override
/// Configuration prefix
const String CONFIG_PREFIX = "kafka";
void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path)
{
if (current_pending)
Poco::Util::AbstractConfiguration::Keys keys;
std::vector<char> errstr(512);
config.keys(path, keys);
for (const auto & key : keys)
{
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
current_pending = false;
return true;
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
conf.set(key_name, config.getString(key_path));
}
// Process next buffered message
auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS));
if (!message)
return false;
if (message.is_eof())
{
// Reached EOF while reading current batch, skip it.
LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset());
return nextImpl();
}
else if (auto err = message.get_error())
{
LOG_ERROR(log, "Consumer error: " << err);
return false;
}
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
if (row_delimiter != '\0' && current)
{
BufferBase::set(&row_delimiter, 1, 0);
current = std::move(message);
current_pending = true;
return true;
}
// Consume message and mark the topic/partition offset
// The offsets will be committed in the readSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without committing offsets
current = std::move(message);
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
return true;
}
public:
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_)
{
if (row_delimiter != '\0')
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
}
/// Commit messages read with this consumer
void commit()
{
LOG_TRACE(log, "Committing " << read_messages << " messages");
if (read_messages == 0)
return;
consumer->async_commit();
read_messages = 0;
}
};
class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_)
: storage(storage_), context(context_), max_block_size(max_block_size_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_skip_unknown_fields", 1u);
// We don't use ratio since the number of Kafka messages may vary from stream to stream.
// Thus, ratio is meaningless.
context.setSetting("input_format_allow_errors_ratio", 1.);
context.setSetting("input_format_allow_errors_num", storage.skip_broken);
if (schema.size() > 0)
context.setSetting("format_schema", schema);
}
~KafkaBlockInputStream() override
{
if (!hasClaimed())
return;
// An error was thrown during the stream or it did not finish successfully
// The read offsets weren't committed, so consumer must rejoin the group from the original starting point
if (!finalized)
{
LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining");
consumer->unsubscribe();
consumer->subscribe(storage.topics);
}
// Return consumer for another reader
storage.pushConsumer(consumer);
consumer = nullptr;
}
String getName() const override
{
return storage.getName();
}
Block readImpl() override
{
if (isCancelledOrThrowIfKilled() || !hasClaimed())
return {};
if (!reader)
throw Exception("Logical error: reader is not initialized", ErrorCodes::LOGICAL_ERROR);
return reader->read();
}
Block getHeader() const override { return storage.getSampleBlock(); }
void readPrefixImpl() override
{
if (!hasClaimed())
{
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
if (consumer == nullptr)
throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
// Start reading data
finalized = false;
reader->readPrefix();
}
void readSuffixImpl() override
{
if (hasClaimed())
{
reader->readSuffix();
// Store offsets read in this stream
read_buf->commit();
}
// Mark as successfully finished
finalized = true;
}
private:
StorageKafka & storage;
ConsumerPtr consumer;
Context context;
size_t max_block_size;
Block sample_block;
std::unique_ptr<ReadBufferFromKafkaConsumer> read_buf;
BlockInputStreamPtr reader;
bool finalized = false;
// Return true if consumer has been claimed by the stream
bool hasClaimed() { return consumer != nullptr; }
};
static void loadFromConfig(cppkafka::Configuration & conf, const AbstractConfiguration & config, const std::string & path)
{
AbstractConfiguration::Keys keys;
std::vector<char> errstr(512);
config.keys(path, keys);
for (const auto & key : keys)
{
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
conf.set(key_name, config.getString(key_path));
}
}
} // namespace
StorageKafka::StorageKafka(
const std::string & table_name_,
@ -361,6 +180,9 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration()
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// for debug logs inside rdkafka
conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
@ -461,7 +283,7 @@ void StorageKafka::streamThread()
// Wait for attached views
if (!stream_cancelled)
task->scheduleAfter(READ_POLL_MS);
task->scheduleAfter(RESCHEDULE_MS);
}