mirror of
synced 2024-11-21 23:21:59 +00:00
This commit is contained in:
@ -574,7 +574,7 @@ class ClickHouseCluster:
raise Exception("Can't wait Minio to start")
def wait_schema_registry_to_start(self, timeout=10):
sr_client = CachedSchemaRegistryClient('http://localhost:8081')
sr_client = CachedSchemaRegistryClient({"url":'http://localhost:8081'})
start = time.time()
while time.time() - start < timeout:
@ -1085,9 +1085,19 @@ class ClickHouseInstance:
def wait_for_log_line(self, regexp, filename='/var/log/clickhouse-server/clickhouse-server.log', timeout=30, repetitions=1, look_behind_lines=100):
start_time = time.time()
result = self.exec_in_container(
["bash", "-c", 'timeout {} tail -Fn{} "{}" | grep -Eqm {} {}'.format(timeout, look_behind_lines, filename, repetitions, shlex.quote(regexp))])
current_time = time.time()
print('Log line matching "{}" appeared in a {} seconds'.format(regexp, current_time - start_time))
["bash", "-c", 'timeout {} tail -Fn{} "{}" | grep -Em {} {}'.format(timeout, look_behind_lines, filename, repetitions, shlex.quote(regexp))])
# if repetitions>1 grep will return success even if not enough lines were collected,
if repetitions>1 and len(result.splitlines()) < repetitions:
print("wait_for_log_line: those lines were founded during {} sec.".format(timeout))
raise Exception("wait_for_log_line: Not enough repetitions: {} found, while {} expected".format(len(result.splitlines()), repetitions))
wait_duration = time.time() - start_time
print('Log line matching "{}" appeared in a {} seconds'.format(regexp, wait_duration))
return wait_duration
def file_exists(self, path):
return self.exec_in_container(
@ -183,7 +183,7 @@ def test_kafka_json_as_string(kafka_cluster):
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
def test_kafka_formats(kafka_cluster):
# data was dumped from clickhouse itself in a following manner
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'
@ -311,7 +311,7 @@ def test_kafka_formats(kafka_cluster):
# On empty message exception happens: Line "" doesn't match the regexp.: (at row 1)
# /src/Processors/Formats/Impl/RegexpRowInputFormat.cpp:140: DB::RegexpRowInputFormat::readRow(std::__1::vector<COW<DB::IColumn>::mutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::mutable_ptr<DB::IColumn> > >&, DB::RowReadExtension&) @ 0x1df82fcb in /usr/bin/clickhouse
'extra_settings': ", format_regexp='\(id = (.+?), blockNo = (.+?), val1 = \"(.+?)\", val2 = (.+?), val3 = (.+?)\)', format_regexp_escaping_rule='Escaped'"
'extra_settings': r", format_regexp='\(id = (.+?), blockNo = (.+?), val1 = \"(.+?)\", val2 = (.+?), val3 = (.+?)\)', format_regexp_escaping_rule='Escaped'"
@ -545,7 +545,7 @@ def test_kafka_formats(kafka_cluster):
'''.format(topic_name=topic_name, format_name=format_name,
extra_settings=format_opts.get('extra_settings') or ''))
instance.wait_for_log_line('kafka.*Committed offset [0-9]+.*format_tests_', repetitions=len(all_formats.keys()), look_behind_lines=12000)
for format_name, format_opts in list(all_formats.items()):
print(('Checking {}'.format(format_name)))
@ -1170,7 +1170,7 @@ def test_kafka_materialized_view(kafka_cluster):
kafka_check_result(result, True)
def test_librdkafka_snappy_regression(kafka_cluster):
def test_librdkafka_compression(kafka_cluster):
Regression for UB in snappy-c (that is used in librdkafka),
backport pr is [1].
@ -1180,55 +1180,63 @@ def test_librdkafka_snappy_regression(kafka_cluster):
Example of corruption:
2020.12.10 09:59:56.831507 [ 20 ] {} <Error> void DB::StorageKafka::threadFunc(size_t): Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected '"' before: 'foo"}': (while reading the value of key value): (at row 1)
, Stack trace (when copying this message, always include the lines below):
To trigger this regression there should duplicated messages
Orignal reproducer is:
$ gcc --version |& fgrep gcc
gcc (GCC) 10.2.0
$ yes foobarbaz | fold -w 80 | head -n10 >| in-…
$ make clean && make CFLAGS='-Wall -g -O2 -ftree-loop-vectorize -DNDEBUG=1 -DSG=1 -fPIC'
$ ./verify in
final comparision of in failed at 20 of 100
# create topic with snappy compression
admin_client = admin.AdminClient({'bootstrap.servers': 'localhost:9092'})
topic_snappy = admin.NewTopic(topic='snappy_regression', num_partitions=1, replication_factor=1, config={
'compression.type': 'snappy',
admin_client.create_topics(new_topics=[topic_snappy], validate_only=False)
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'snappy_regression',
kafka_group_name = 'ch_snappy_regression',
kafka_format = 'JSONEachRow';
supported_compression_types = ['gzip', 'snappy', 'lz4', 'zstd', 'uncompressed']
messages = []
expected = []
# To trigger this regression there should duplicated messages
# Orignal reproducer is:
# $ gcc --version |& fgrep gcc
# gcc (GCC) 10.2.0
# $ yes foobarbaz | fold -w 80 | head -n10 >| in-…
# $ make clean && make CFLAGS='-Wall -g -O2 -ftree-loop-vectorize -DNDEBUG=1 -DSG=1 -fPIC'
# $ ./verify in
# final comparision of in failed at 20 of 100
value = 'foobarbaz'*10
number_of_messages = 50
for i in range(number_of_messages):
messages.append(json.dumps({'key': i, 'value': value}))
kafka_produce('snappy_regression', messages)
expected = '\n'.join(expected)
while True:
result = instance.query('SELECT * FROM test.kafka')
rows = len(result.strip('\n').split('\n'))
if rows == number_of_messages:
for compression_type in supported_compression_types:
print(('Check compression {}'.format(compression_type)))
assert TSV(result) == TSV(expected)
topic_name = 'test_librdkafka_compression_{}'.format(compression_type)
admin_client = admin.AdminClient({'bootstrap.servers': 'localhost:9092'})
topic = admin.NewTopic(topic=topic_name, num_partitions=1, replication_factor=1, config={
'compression.type': compression_type,
admin_client.create_topics(new_topics=[topic], validate_only=False)
instance.query('DROP TABLE test.kafka')
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{topic_name}_group',
kafka_format = 'JSONEachRow',
kafka_flush_interval_ms = 1000;
CREATE MATERIALIZED VIEW test.consumer Engine=Log AS
SELECT * FROM test.kafka;
'''.format(topic_name=topic_name) )
kafka_produce(topic_name, messages)
instance.wait_for_log_line("Committed offset {}".format(number_of_messages))
result = instance.query('SELECT * FROM test.consumer')
assert TSV(result) == TSV(expected)
instance.query('DROP TABLE test.kafka SYNC')
instance.query('DROP TABLE test.consumer SYNC')
def test_kafka_materialized_view_with_subquery(kafka_cluster):
@ -1577,9 +1585,6 @@ def test_kafka_commit_on_block_write(kafka_cluster):
DROP TABLE test.kafka;
while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka'")) == 1:
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
@ -1874,7 +1879,8 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
kafka_topic_list = 'topic_with_multiple_partitions2',
kafka_group_name = 'topic_with_multiple_partitions2',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 211;
kafka_max_block_size = 211,
kafka_flush_interval_ms = 500;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
@ -1892,7 +1898,7 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
kafka_produce('topic_with_multiple_partitions2', messages)
instance.wait_for_log_line('kafka.*Stalled', repetitions=20)
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
@ -1961,7 +1967,8 @@ def test_kafka_rebalance(kafka_cluster):
kafka_topic_list = 'topic_with_multiple_partitions',
kafka_group_name = 'rebalance_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 33;
kafka_max_block_size = 33,
kafka_flush_interval_ms = 500;
CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS
@ -1975,21 +1982,15 @@ def test_kafka_rebalance(kafka_cluster):
FROM test.{0};
# kafka_cluster.open_bash_shell('instance')
while int(
instance.query("SELECT count() FROM test.destination WHERE _consumed_by='{}'".format(table_name))) == 0:
print(("Waiting for test.kafka_consumer{} to start consume".format(consumer_index)))
# Waiting for test.kafka_consumerX to start consume ...
instance.wait_for_log_line('kafka_consumer{}.*Polled offset [0-9]+'.format(consumer_index))
# I leave last one working by intent (to finish consuming after all rebalances)
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS - 1):
print(("Dropping test.kafka_consumer{}".format(consumer_index)))
instance.query('DROP TABLE IF EXISTS test.kafka_consumer{}'.format(consumer_index))
while int(instance.query(
"SELECT count() FROM system.tables WHERE database='test' AND name='kafka_consumer{}'".format(
consumer_index))) == 1:
instance.query('DROP TABLE IF EXISTS test.kafka_consumer{} SYNC'.format(consumer_index))
# print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
# kafka_cluster.open_bash_shell('instance')
@ -2044,7 +2045,7 @@ def test_kafka_rebalance(kafka_cluster):
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
@ -2060,31 +2061,28 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
kafka_max_block_size = 20,
kafka_flush_interval_ms = 2000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
CREATE TABLE test.view (key UInt64, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1')
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
# init PartitionManager (it starts container) earlier
pm = PartitionManager()
CREATE TABLE test.view (key UInt64, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1')
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.1);
WHERE NOT sleepEachRow(0.25);
# the tricky part here is that disconnect should happen after write prefix, but before write suffix
instance.wait_for_log_line("Polled batch of 20 messages")
# the tricky part here is that disconnect should happen after write prefix, but before write suffix
# we have 0.25 (sleepEachRow) * 20 ( Rows ) = 5 sec window after "Polled batch of 20 messages"
# while materialized view is working to inject zookeeper failure
instance.wait_for_log_line("Coordination.*while write prefix to view")
instance.wait_for_log_line("Error.*(session has been expired|Connection loss).*while write prefix to view")
instance.wait_for_log_line("Committed offset 23")
instance.wait_for_log_line("Committed offset 22")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
@ -2261,7 +2259,7 @@ def test_bad_reschedule(kafka_cluster):
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
@ -2277,44 +2275,35 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
kafka_max_block_size = 20,
kafka_flush_interval_ms = 1000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.5);
# print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(3) # MV will work for 10 sec, after that commit should happen, we want to pause before
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.25);
# print time.strftime("%m/%d/%Y %H:%M:%S")
instance.wait_for_log_line("Polled batch of 20 messages")
# the tricky part here is that disconnect should happen after write prefix, but before we do commit
# we have 0.25 (sleepEachRow) * 20 ( Rows ) = 5 sec window after "Polled batch of 20 messages"
# while materialized view is working to inject zookeeper failure
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
# print time.strftime("%m/%d/%Y %H:%M:%S")
# if we restore the connection too fast (<30sec) librdkafka will not report any timeout
# (alternative is to decrease the default session timeouts for librdkafka)
# when the delay is too long (>50sec) broker will decide to remove us from the consumer group,
# and will start answering "Broker: Unknown member"
instance.wait_for_log_line("Exception during commit attempt: Local: Waiting for coordinator", timeout=45)
instance.wait_for_log_line("All commit attempts failed", look_behind_lines=500)
# kafka_cluster.open_bash_shell('instance')
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("Local: Waiting for coordinator")
assert instance.contains_in_log("All commit attempts failed")
instance.wait_for_log_line("Committed offset 22")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
@ -2420,7 +2409,8 @@ def test_kafka_unavailable(kafka_cluster):
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
kafka_max_block_size = 1000,
kafka_flush_interval_ms = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
Reference in New Issue
Block a user