Merge pull request #5080 from abyss7/issue-4736

Do not drop Kafka Consumer buffers after deletion of stream
This commit is contained in:
alexey-milovidov 2019-04-22 19:18:33 +03:00 committed by GitHub
commit b1cf026b5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 383 additions and 93 deletions

View File

@ -24,36 +24,21 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
return;
if (broken)
{
LOG_TRACE(storage.log, "Re-joining claimed consumer after failure");
consumer->unsubscribe();
}
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->unsubscribe();
storage.pushConsumer(consumer);
storage.pushBuffer(buffer);
}
void KafkaBlockInputStream::readPrefixImpl()
{
consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
claimed = !!consumer;
buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
claimed = !!buffer;
if (!consumer)
consumer = std::make_shared<cppkafka::Consumer>(storage.createConsumerConfiguration());
if (!buffer)
buffer = storage.createBuffer();
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.topics);
consumer->pause(); // don't accidentally read any messages
consumer->subscribe(storage.topics);
consumer->poll(5s);
consumer->resume();
}
buffer = std::make_unique<DelimitedReadBuffer>(
new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter);
addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size));
broken = true;
@ -66,4 +51,4 @@ void KafkaBlockInputStream::readSuffixImpl()
broken = false;
}
} // namespace DB
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <IO/DelimitedReadBuffer.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
@ -27,8 +26,7 @@ private:
Context context;
UInt64 max_block_size;
ConsumerPtr consumer;
std::unique_ptr<DelimitedReadBuffer> buffer;
BufferPtr buffer;
bool broken = true, claimed = false;
};

View File

@ -2,10 +2,11 @@
namespace DB
{
namespace
{
const auto READ_POLL_MS = 500; /// How long to wait for a batch of messages.
} // namespace
}
void ReadBufferFromKafkaConsumer::commit()
{
@ -13,10 +14,32 @@ void ReadBufferFromKafkaConsumer::commit()
return;
auto & previous = *std::prev(current);
LOG_TRACE(log, "Committing message with offset " << previous.get_offset());
consumer->async_commit(previous);
}
void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
{
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())
{
using namespace std::chrono_literals;
consumer->pause(); // don't accidentally read any messages
consumer->subscribe(topics);
consumer->poll(5s);
consumer->resume();
}
}
void ReadBufferFromKafkaConsumer::unsubscribe()
{
LOG_TRACE(log, "Re-joining claimed consumer after failure");
consumer->unsubscribe();
}
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
@ -50,4 +73,4 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
return true;
}
} // namespace DB
}

View File

@ -1,14 +1,18 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <Core/Names.h>
#include <IO/DelimitedReadBuffer.h>
#include <common/logger_useful.h>
#include <cppkafka/cppkafka.h>
namespace DB
{
using BufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
public:
@ -17,8 +21,9 @@ public:
{
}
// Commit all processed messages.
void commit();
void commit(); // Commit all processed messages.
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
private:
using Messages = std::vector<cppkafka::Message>;
@ -33,4 +38,4 @@ private:
bool nextImpl() override;
};
} // namespace DB
}

View File

@ -81,7 +81,7 @@ StorageKafka::StorageKafka(
row_delimiter(row_delimiter_),
schema_name(global_context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers(),
semaphore(0, num_consumers_),
skip_broken(skip_broken_)
{
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
@ -124,12 +124,8 @@ void StorageKafka::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
consumer->subscribe(topics);
// Make consumer available
pushConsumer(consumer);
// Make buffer available
pushBuffer(createBuffer());
++num_created_consumers;
}
@ -146,8 +142,8 @@ void StorageKafka::shutdown()
// Close all consumers
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto consumer = claimConsumer();
// FIXME: not sure if really close consumers here, and if we really need to close them here.
auto buffer = claimBuffer();
// FIXME: not sure if we really close consumers here, and if we really need to close them here.
}
LOG_TRACE(log, "Waiting for cleanup");
@ -203,14 +199,29 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration()
return conf;
}
ConsumerPtr StorageKafka::claimConsumer()
BufferPtr StorageKafka::createBuffer()
{
return tryClaimConsumer(-1L);
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
consumer->subscribe(topics);
// Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef();
size_t batch_size = max_block_size;
if (!batch_size)
batch_size = settings.max_block_size.value;
return std::make_shared<DelimitedReadBuffer>(new ReadBufferFromKafkaConsumer(consumer, log, batch_size), row_delimiter);
}
ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
BufferPtr StorageKafka::claimBuffer()
{
// Wait for the first free consumer
return tryClaimBuffer(-1L);
}
BufferPtr StorageKafka::tryClaimBuffer(long wait_ms)
{
// Wait for the first free buffer
if (wait_ms >= 0)
{
if (!semaphore.tryWait(wait_ms))
@ -219,17 +230,17 @@ ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
else
semaphore.wait();
// Take the first available consumer from the list
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto consumer = consumers.back();
consumers.pop_back();
return consumer;
auto buffer = buffers.back();
buffers.pop_back();
return buffer;
}
void StorageKafka::pushConsumer(ConsumerPtr consumer)
void StorageKafka::pushBuffer(BufferPtr buffer)
{
std::lock_guard lock(mutex);
consumers.push_back(consumer);
buffers.push_back(buffer);
semaphore.set();
}
@ -303,7 +314,6 @@ bool StorageKafka::streamToViews()
insert->table = table_name;
insert->no_destination = true; // Only insert into dependent views
// Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef();
size_t block_size = max_block_size;
if (block_size == 0)

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Poco/Event.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
@ -14,15 +15,13 @@
namespace DB
{
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
*/
class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorage
{
friend class KafkaBlockInputStream;
friend class KafkaBlockOutputStream;
friend class KafkaBlockInputStream;
friend class KafkaBlockOutputStream;
public:
std::string getName() const override { return "Kafka"; }
@ -40,7 +39,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
@ -74,7 +73,7 @@ private:
// Consumer list
Poco::Semaphore semaphore;
std::mutex mutex;
std::vector<ConsumerPtr> consumers; /// Available consumers
std::vector<BufferPtr> buffers; /// available buffers for Kafka consumers
size_t skip_broken;
@ -83,9 +82,10 @@ private:
std::atomic<bool> stream_cancelled{false};
cppkafka::Configuration createConsumerConfiguration();
ConsumerPtr claimConsumer();
ConsumerPtr tryClaimConsumer(long wait_ms);
void pushConsumer(ConsumerPtr c);
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
void streamThread();
bool streamToViews();

View File

@ -12,7 +12,7 @@ services:
- label:disable
kafka1:
image: confluentinc/cp-kafka:4.1.0
image: confluentinc/cp-kafka:5.2.0
hostname: kafka1
ports:
- "9092:9092"

View File

@ -7,7 +7,8 @@ from helpers.test_tools import TSV
import json
import subprocess
from kafka import KafkaProducer
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer
from google.protobuf.internal.encoder import _VarintBytes
"""
@ -62,22 +63,11 @@ def wait_kafka_is_available(max_retries=50):
def kafka_produce(topic, messages):
p = subprocess.Popen(('docker',
'exec',
'-i',
kafka_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'INSIDE://localhost:9092',
'--topic',
topic,
'--sync',
'--message-send-max-retries',
'100'),
stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
print("Produced {} messages for topic {}".format(len(messages.splitlines()), topic))
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for message in messages:
producer.send(topic=topic, value=message)
producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic))
def kafka_produce_protobuf_messages(topic, start_index, num_messages):
@ -141,9 +131,9 @@ def test_kafka_settings_old_syntax(kafka_cluster):
# Don't insert malformed messages since old settings syntax
# doesn't support skipping of broken messages.
messages = ''
messages = []
for i in range(50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('old', messages)
result = ''
@ -167,18 +157,18 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_skip_broken_messages = 1;
''')
messages = ''
messages = []
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('new', messages)
# Insert couple of malformed messages.
kafka_produce('new', '}{very_broken_message,\n')
kafka_produce('new', '}another{very_broken_message,\n')
kafka_produce('new', ['}{very_broken_message,'])
kafka_produce('new', ['}another{very_broken_message,'])
messages = ''
messages = []
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('new', messages)
result = ''
@ -201,9 +191,9 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_row_delimiter = '\\n';
''')
messages = ''
messages = []
for i in range(50):
messages += '{i}, {i}\n'.format(i=i)
messages.append('{i}, {i}'.format(i=i))
kafka_produce('csv', messages)
result = ''
@ -226,9 +216,9 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_row_delimiter = '\\n';
''')
messages = ''
messages = []
for i in range(50):
messages += '{i}\t{i}\n'.format(i=i)
messages.append('{i}\t{i}'.format(i=i))
kafka_produce('tsv', messages)
result = ''
@ -239,6 +229,35 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json', [messages])
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json', [messages])
result = ''
for i in range(50):
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
kafka_check_result(result, True)
def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
@ -282,9 +301,9 @@ def test_kafka_materialized_view(kafka_cluster):
SELECT * FROM test.kafka;
''')
messages = ''
messages = []
for i in range(50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('json', messages)
for i in range(20):
@ -300,6 +319,52 @@ def test_kafka_materialized_view(kafka_cluster):
''')
def test_kafka_flush_on_big_message(kafka_cluster):
# Create batchs of messages of size ~100Kb
kafka_messages = 10000
batch_messages = 1000
messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)]
kafka_produce('flush', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
client = KafkaAdminClient(bootstrap_servers="localhost:9092")
received = False
while not received:
try:
offsets = client.list_consumer_group_offsets('flush')
for topic, offset in offsets.items():
if topic.topic == 'flush' and offset.offset == kafka_messages:
received = True
break
except kafka.errors.GroupCoordinatorNotAvailableError:
continue
for _ in range(20):
time.sleep(1)
result = instance.query('SELECT count() FROM test.view')
if int(result) == kafka_messages*batch_messages:
break
assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")

39
utils/kafka/consume.py Executable file
View File

@ -0,0 +1,39 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from pprint import pprint
def main():
parser = argparse.ArgumentParser(description='Kafka Producer client')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
parser.add_argument('--topic', type=str, required=True,
help='name of Kafka topic to store in')
parser.add_argument('--group', type=str, required=True,
help='name of the consumer group')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
'group_id': args.group,
}
client = kafka.KafkaConsumer(**config)
client.subscribe([args.topic])
pprint(client.poll(10000))
client.unsubscribe()
client.close()
if __name__ == "__main__":
exit(main())

41
utils/kafka/manage.py Executable file
View File

@ -0,0 +1,41 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
def main():
parser = argparse.ArgumentParser(description='Kafka Topic manager')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
commands = parser.add_mutually_exclusive_group()
commands.add_argument('--create', type=str, metavar='TOPIC', nargs='+',
help='create new topic(s) in the cluster')
commands.add_argument('--delete', type=str, metavar='TOPIC', nargs='+',
help='delete existing topic(s) from the cluster')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
}
client = kafka.KafkaAdminClient(**config)
if args.create:
print(client.create_topics(args.create))
elif args.delete:
print(client.delete_topics(args.delete))
client.close()
if __name__ == "__main__":
exit(main())

72
utils/kafka/produce.py Executable file
View File

@ -0,0 +1,72 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from concurrent.futures import ThreadPoolExecutor
import enum
import multiprocessing
import sys
class Sync(enum.Enum):
NONE = 'none'
LEAD = 'leader'
ALL = 'all'
def __str__(self):
return self.value
def convert(self):
values = {
str(Sync.NONE): '0',
str(Sync.LEAD): '1',
str(Sync.ALL): 'all',
}
return values[self.value]
def main():
parser = argparse.ArgumentParser(description='Produce a single message taken from input')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
parser.add_argument('--topic', type=str, required=True,
help='name of Kafka topic to store in')
parser.add_argument('--retries', type=int, default=0,
help='number of retries to send on failure')
parser.add_argument('--multiply', type=int, default=1,
help='multiplies incoming string many times')
parser.add_argument('--repeat', type=int, default=1,
help='send same (multiplied) message many times')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
'retries': args.retries,
}
client = kafka.KafkaProducer(**config)
message = sys.stdin.buffer.read() * args.multiply
def send(num):
client.send(topic=args.topic, value=message)
print(f'iteration {num}: sent a message multiplied {args.multiply} times')
pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
for num in range(args.repeat):
pool.submit(send, num)
pool.shutdown()
client.flush()
client.close()
if __name__ == "__main__":
exit(main())

52
utils/kafka/status.py Executable file
View File

@ -0,0 +1,52 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from pprint import pprint
def main():
parser = argparse.ArgumentParser(description='Kafka client to get groups and topics status')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
}
client = kafka.KafkaAdminClient(**config)
consumer = kafka.KafkaConsumer(**config)
cluster = client._client.cluster
topics = cluster.topics()
for topic in topics:
print(f'Topic "{topic}":', end='')
for partition in cluster.partitions_for_topic(topic):
tp = kafka.TopicPartition(topic, partition)
print(f' {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})', end='')
print()
groups = client.list_consumer_groups()
for group in groups:
print(f'Group "{group[0]}" ({group[1]}):')
consumer = kafka.KafkaConsumer(**config, group_id=group[0])
offsets = client.list_consumer_group_offsets(group[0])
for topic, offset in offsets.items():
print(f'\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}')
consumer.close()
client.close()
if __name__ == "__main__":
exit(main())