add test cases

This commit is contained in:
Peng Jian 2021-03-31 20:22:36 +08:00
parent 36e4a8e2dd
commit e30c07db20
3 changed files with 83 additions and 9 deletions

View File

@ -2593,6 +2593,78 @@ def test_kafka_engine_put_errors_to_stream(kafka_cluster):
DROP TABLE test.kafka_errors;
''')
def gen_normal_json():
return '{"i":1000, "s":"ABC123abc"}'
def gen_malformed_json():
return '{"i":"n1000", "s":"1000"}'
def gen_message_with_jsons(jsons = 10, malformed = 0):
s = io.StringIO()
for i in range (jsons):
if malformed and random.randint(0,1) == 1:
s.write(gen_malformed_json())
else:
s.write(gen_normal_json())
s.write(' ')
return s.getvalue()
def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka_data;
DROP TABLE IF EXISTS test.kafka_errors;
CREATE TABLE test.kafka (i Int64, s String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 100,
kafka_poll_max_batch_size = 1,
kafka_handle_error_mode = 'stream';
CREATE MATERIALIZED VIEW test.kafka_data (i Int64, s String)
ENGINE = MergeTree
ORDER BY i
AS SELECT i, s FROM test.kafka WHERE length(_error) == 0;
CREATE MATERIALIZED VIEW test.kafka_errors (topic String, partition Int64, offset Int64, raw String, error String)
ENGINE = MergeTree
ORDER BY (topic, offset)
AS SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM test.kafka WHERE length(_error) > 0;
''')
messages = []
for i in range(128):
if i % 2 == 0:
messages.append(gen_message_with_jsons(10, 1))
else:
messages.append(gen_message_with_jsons(10, 0))
kafka_produce('json', messages)
while True:
total_rows = instance.query('SELECT count() FROM test.kafka_data', ignore_error=True)
if total_rows == '640\n':
break
while True:
total_error_rows = instance.query('SELECT count() FROM test.kafka_errors', ignore_error=True)
if total_error_rows == '64\n':
break
instance.query('''
DROP TABLE test.kafka;
DROP TABLE test.kafka_data;
DROP TABLE test.kafka_errors;
''')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -1,4 +0,0 @@
040501610555496e7438010101010101620655496e743136020002000200
02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de
adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca
febabecafebabecafebabecafebabe

View File

@ -4,10 +4,16 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$RESULT="result.dat"
$REFERENCE="01781_general_format_for_parser.reference"
$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd > $RESULT
TEST_OUTPUT=`$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd -ps`
EXPECTED_OUTPUT=`
echo '040501610555496e7438010101010101620655496e743136020002000200
02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de
adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca
febabecafebabecafebabecafebabe'`
cmp --silent $RESULT $REFERENCE && echo 'OK' || echo 'FAIL'
rm -rf $RESULT
if [ "$TEST_OUTPUT" = "$EXPECTED_OUTPUT" ]; then
echo 'OK'
else
echo 'FAIL'
fi