Don't use async producing in Kafka

This commit is contained in:
avogar 2022-12-30 16:40:04 +00:00
parent 894726bd8f
commit a74ffa70bc
2 changed files with 27 additions and 67 deletions

View File

@ -16,8 +16,6 @@ namespace ProfileEvents
namespace DB
{
static const auto BATCH = 1000;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -25,7 +23,7 @@ namespace ErrorCodes
KafkaProducer::KafkaProducer(
ProducerPtr producer_, const std::string & topic_, std::chrono::milliseconds poll_timeout, std::atomic<bool> & shutdown_called_, const Block & header)
: producer(producer_), topic(topic_), timeout(poll_timeout), shutdown_called(shutdown_called_), payloads(BATCH)
: producer(producer_), topic(topic_), timeout(poll_timeout), shutdown_called(shutdown_called_)
{
if (header.has("_key"))
{
@ -48,72 +46,46 @@ KafkaProducer::KafkaProducer(
void KafkaProducer::produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row)
{
ProfileEvents::increment(ProfileEvents::KafkaRowsWritten, rows_in_message);
Payload payload;
payload.message = message;
cppkafka::MessageBuilder builder(topic);
builder.payload(message);
// Note: if it will be few rows per message - it will take the value from last row of block
if (key_column_index)
{
const auto & key_column = assert_cast<const ColumnString &>(*columns[key_column_index.value()]);
payload.key = key_column.getDataAt(last_row).toString();
const auto key_data = key_column.getDataAt(last_row).toString();
builder.key(cppkafka::Buffer(key_data.data(), key_data.size()));
}
if (timestamp_column_index)
{
const auto & timestamp_column = assert_cast<const ColumnUInt32 &>(*columns[timestamp_column_index.value()]);
payload.timestamp = std::chrono::seconds{timestamp_column.getElement(last_row)};
const auto timestamp = std::chrono::seconds{timestamp_column.getElement(last_row)};
builder.timestamp(timestamp);
}
if (!payloads.push(std::move(payload)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
}
void KafkaProducer::startProducingTaskLoop()
{
cppkafka::MessageBuilder builder(topic);
while ((!payloads.isFinishedAndEmpty()) && !shutdown_called.load())
while (!shutdown_called)
{
Payload payload;
if (!payloads.pop(payload))
break;
builder.payload(payload.message);
if (payload.key)
builder.key(cppkafka::Buffer(payload.key->data(), payload.key->size()));
if (payload.timestamp)
builder.timestamp(*payload.timestamp);
while (true)
try
{
try
{
producer->produce(builder);
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
producer->poll(timeout);
continue;
}
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw;
}
ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
break;
producer->produce(builder);
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
producer->poll(timeout);
continue;
}
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw;
}
ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
break;
}
}
void KafkaProducer::stopProducingTask()
{
payloads.finish();
}
void KafkaProducer::finishImpl()
void KafkaProducer::finish()
{
// For unknown reason we may hit some internal timeout when inserting for the first time.
while (true)

View File

@ -18,7 +18,7 @@ namespace DB
class Block;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class KafkaProducer : public AsynchronousMessageProducer
class KafkaProducer : public IMessageProducer
{
public:
KafkaProducer(
@ -30,21 +30,10 @@ public:
void produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row) override;
void start(const ContextPtr &) override {}
void finish() override;
private:
void stopProducingTask() override;
void finishImpl() override;
String getProducingTaskName() const override { return "KafkaProducingTask"; }
void startProducingTaskLoop() override;
struct Payload
{
String message;
std::optional<String> key;
std::optional<std::chrono::milliseconds> timestamp;
};
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers};
ProducerPtr producer;
@ -55,7 +44,6 @@ private:
std::optional<size_t> key_column_index;
std::optional<size_t> timestamp_column_index;
ConcurrentBoundedQueue<Payload> payloads;
};
}