After review fixes

This commit is contained in:
Mikhail Filimonov 2020-05-14 12:15:38 +02:00
parent a1013f1e71
commit 6499d702f9
2 changed files with 14 additions and 9 deletions

View File

@ -235,14 +235,19 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header)
ConsumerBufferPtr StorageKafka::createReadBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
updateConfiguration(conf);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
updateConfiguration(conf);
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(conf);

View File

@ -238,17 +238,17 @@ def test_kafka_consumer_hang(kafka_cluster):
DROP TABLE test.view;
''')
# we expect no hanging drop queries
# original problem appearance was a sequence of the following messages in kafka logs:
# BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever)
# so it was waiting forever while the application will execute queued rebalance callback
# now we drain all queued callbacks (visible as 'Rebalance initiated' after 'Waiting for cleanup')
instance.exec_in_container(["bash", "-c", "tail -n 500 /var/log/clickhouse-server/clickhouse-server.log | grep 'Waiting for cleanup' -A 500 | grep -q 'Rebalance initiated. Revoking partitions'"])
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
# log = '/var/log/clickhouse-server/stderr.log'
# instance.exec_in_container(['grep', '-q', 'BROKERFAIL', log])
# instance.exec_in_container(['grep', '-q', '|ASSIGN|', log])
# instance.exec_in_container(['grep', '-q', 'Heartbeat failed: REBALANCE_IN_PROGRESS: group is rebalancing', log])
# instance.exec_in_container(['grep', '-q', 'Group "consumer_hang": waiting for rebalance_cb', log])
@pytest.mark.timeout(180)
def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''