mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Remove Log engine from Kafka integration tests
It doesn't work well when `thread_per_consumer` is used as writer can make readers starve when `shared_time_mutex` prefers writes over reads.
This commit is contained in:
parent
0aa30b10d5
commit
8f124710ef
@ -1019,7 +1019,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
|
||||
|
||||
DROP TABLE IF EXISTS test.kafka_{format_name}_mv;
|
||||
|
||||
CREATE MATERIALIZED VIEW test.kafka_{format_name}_mv Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.kafka_{format_name}_mv ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT *, _topic, _partition, _offset FROM test.kafka_{format_name};
|
||||
""".format(
|
||||
topic_name=topic_name,
|
||||
@ -2460,7 +2460,7 @@ def test_kafka_commit_on_block_write(kafka_cluster, create_query_generator):
|
||||
(generate_old_create_table_query, "kafka.*Committed offset 2.*virt2_[01]"),
|
||||
(
|
||||
generate_new_create_table_query,
|
||||
r"kafka.*Saved offset 2[0-9]* for topic-partition \[virt2_[01]:[0-9]+",
|
||||
r"kafka.*Saved offset 2 for topic-partition \[virt2_[01]:[0-9]+",
|
||||
),
|
||||
],
|
||||
)
|
||||
@ -2494,7 +2494,7 @@ def test_kafka_virtual_columns2(kafka_cluster, create_query_generator, log_line)
|
||||
f"""
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp), toUnixTimestamp64Milli(_timestamp_ms), _headers.name, _headers.value FROM test.kafka;
|
||||
"""
|
||||
)
|
||||
@ -2729,7 +2729,7 @@ def test_kafka_produce_key_timestamp(kafka_cluster, create_query_generator, log_
|
||||
DROP TABLE IF EXISTS test.consumer;
|
||||
{writer_create_query};
|
||||
{reader_create_query};
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT key, value, inserted_key, toUnixTimestamp(inserted_timestamp), _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka;
|
||||
"""
|
||||
)
|
||||
@ -2865,7 +2865,7 @@ def test_kafka_produce_consume_avro(kafka_cluster, create_query_generator):
|
||||
{writer_create_query};
|
||||
{reader_create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT key, value FROM test.kafka;
|
||||
"""
|
||||
)
|
||||
@ -3537,7 +3537,7 @@ def test_bad_reschedule(kafka_cluster, create_query_generator):
|
||||
f"""
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.destination ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT
|
||||
key,
|
||||
now() as consume_ts,
|
||||
@ -3745,7 +3745,7 @@ def test_kafka_unavailable(kafka_cluster, create_query_generator, do_direct_read
|
||||
f"""
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.destination_unavailable Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.destination_unavailable ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT
|
||||
key,
|
||||
now() as consume_ts,
|
||||
@ -4267,12 +4267,12 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
|
||||
{create_query};
|
||||
|
||||
DROP TABLE IF EXISTS test.kafka_data_{format_name}_mv;
|
||||
CREATE MATERIALIZED VIEW test.kafka_data_{format_name}_mv Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.kafka_data_{format_name}_mv ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT *, _topic, _partition, _offset FROM test.kafka_{format_name}
|
||||
WHERE length(_error) = 0;
|
||||
|
||||
DROP TABLE IF EXISTS test.kafka_errors_{format_name}_mv;
|
||||
CREATE MATERIALIZED VIEW test.kafka_errors_{format_name}_mv Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.kafka_errors_{format_name}_mv ENGINE=MergeTree ORDER BY tuple() AS
|
||||
SELECT {raw_message} as raw_message, _error as error, _topic as topic, _partition as partition, _offset as offset FROM test.kafka_{format_name}
|
||||
WHERE length(_error) > 0;
|
||||
"""
|
||||
@ -4796,7 +4796,7 @@ def test_max_rows_per_message(kafka_cluster, create_query_generator):
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY key, value AS
|
||||
SELECT key, value FROM test.kafka;
|
||||
"""
|
||||
)
|
||||
@ -4875,7 +4875,7 @@ def test_row_based_formats(kafka_cluster, create_query_generator):
|
||||
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY key, value AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
|
||||
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
|
||||
@ -4982,7 +4982,7 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
|
||||
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY key, value AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
|
||||
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;
|
||||
@ -5362,7 +5362,7 @@ def test_formats_errors(kafka_cluster):
|
||||
input_format_with_names_use_header=0,
|
||||
format_schema='key_value_message:Message';
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY key, value AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
"""
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user