From e17d4e1eeb14fb4ba7e9ea02604b7438c0551e6e Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 25 Sep 2020 18:17:57 +0200 Subject: [PATCH] Tests and some docs --- docs/en/interfaces/formats.md | 5 ++++ docs/ru/interfaces/formats.md | 9 ++++-- .../Formats/Impl/ProtobufRowOutputFormat.h | 4 ++- tests/integration/test_storage_kafka/test.py | 27 +++++++++++++++++- .../00825_protobuf_format_input.reference | 7 +++++ .../00825_protobuf_format_input.sh | 15 ++++++++++ .../00825_protobuf_format_input_single.insh | 12 ++++++++ .../00825_protobuf_format_output.reference | Bin 2433 -> 3726 bytes .../00825_protobuf_format_output.sh | 17 ++++++++++- 9 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/00825_protobuf_format_input_single.insh diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index bfe5b6218e4..9fe57b5d57b 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -43,6 +43,7 @@ The supported formats are: | [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ | | [PrettySpace](#prettyspace) | ✗ | ✔ | | [Protobuf](#protobuf) | ✔ | ✔ | +| [ProtobufSingle](#protobufsingle) | ✔ | ✔ | | [Avro](#data-format-avro) | ✔ | ✔ | | [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ | | [Parquet](#data-format-parquet) | ✔ | ✔ | @@ -1075,6 +1076,10 @@ ClickHouse inputs and outputs protobuf messages in the `length-delimited` format It means before every message should be written its length as a [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints). See also [how to read/write length-delimited protobuf messages in popular languages](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages). +## ProtobufSingle {#protobufsingle} + +Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters. + ## Avro {#data-format-avro} [Apache Avro](https://avro.apache.org/) is a row-oriented data serialization framework developed within Apache’s Hadoop project. diff --git a/docs/ru/interfaces/formats.md b/docs/ru/interfaces/formats.md index dd68f7eb646..02fcd8a20d4 100644 --- a/docs/ru/interfaces/formats.md +++ b/docs/ru/interfaces/formats.md @@ -27,6 +27,7 @@ ClickHouse может принимать (`INSERT`) и отдавать (`SELECT | [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ | | [PrettySpace](#prettyspace) | ✗ | ✔ | | [Protobuf](#protobuf) | ✔ | ✔ | +| [ProtobufSingle](#protobufsingle) | ✔ | ✔ | | [Parquet](#data-format-parquet) | ✔ | ✔ | | [Arrow](#data-format-arrow) | ✔ | ✔ | | [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ | @@ -947,6 +948,10 @@ message MessageType { ClickHouse пишет и читает сообщения `Protocol Buffers` в формате `length-delimited`. Это означает, что перед каждым сообщением пишется его длина в формате [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints). См. также [как читать и записывать сообщения Protocol Buffers в формате length-delimited в различных языках программирования](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages). +## ProtobufSingle {#protobufsingle} + +То же, что [Protobuf](#protobuf), но без разделителей. Позволяет записать / прочитать не более одного сообщения за раз. + ## Avro {#data-format-avro} [Apache Avro](https://avro.apache.org/) — это ориентированный на строки фреймворк для сериализации данных. Разработан в рамках проекта Apache Hadoop. @@ -957,7 +962,7 @@ ClickHouse пишет и читает сообщения `Protocol Buffers` в ## AvroConfluent {#data-format-avro-confluent} -Для формата `AvroConfluent` ClickHouse поддерживает декодирование сообщений `Avro` с одним объектом. Такие сообщения используются с [Kafka] (http://kafka.apache.org/) и реестром схем [Confluent](https://docs.confluent.io/current/schema-registry/index.html). +Для формата `AvroConfluent` ClickHouse поддерживает декодирование сообщений `Avro` с одним объектом. Такие сообщения используются с [Kafka] (http://kafka.apache.org/) и реестром схем [Confluent](https://docs.confluent.io/current/schema-registry/index.html). Каждое сообщение `Avro` содержит идентификатор схемы, который может быть разрешен для фактической схемы с помощью реестра схем. @@ -971,7 +976,7 @@ URL-адрес реестра схем настраивается с помощ ### Использование {#ispolzovanie} -Чтобы быстро проверить разрешение схемы, используйте [kafkacat](https://github.com/edenhill/kafkacat) с языком запросов [clickhouse-local](../operations/utilities/clickhouse-local.md): +Чтобы быстро проверить разрешение схемы, используйте [kafkacat](https://github.com/edenhill/kafkacat) с языком запросов [clickhouse-local](../operations/utilities/clickhouse-local.md): ``` bash $ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table' diff --git a/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h b/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h index b5bfb06db5a..5a30f22a59f 100644 --- a/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h @@ -25,7 +25,9 @@ namespace DB { /** Stream designed to serialize data in the google protobuf format. * Each row is written as a separated message. - * These messages are delimited according to documentation + * + * With single_message_mode=1 it can write only single row as plain protobuf message, + * otherwise Protobuf messages are delimited according to documentation * https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h * Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g. * SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message' diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 32e00276bb6..6ef37c1e231 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -982,7 +982,7 @@ def test_kafka_protobuf(kafka_cluster): kafka_check_result(result, True) -@pytest.mark.timeout(180) +@pytest.mark.timeout(30) def test_kafka_protobuf_no_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) @@ -1006,6 +1006,31 @@ def test_kafka_protobuf_no_delimiter(kafka_cluster): kafka_check_result(result, True) + instance.query(''' + CREATE TABLE test.kafka_writer (key UInt64, value String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'pb_no_delimiter', + kafka_group_name = 'pb_no_delimiter', + kafka_format = 'ProtobufSingle', + kafka_schema = 'kafka.proto:KeyValuePair'; + ''') + + instance.query("INSERT INTO test.kafka_writer VALUES (13,'Friday'),(42,'Answer to the Ultimate Question of Life, the Universe, and Everything'), (110, 'just a number')") + + time.sleep(1) + + result = instance.query("SELECT * FROM test.kafka ORDER BY key", ignore_error=True) + + expected = '''\ +13 Friday +42 Answer to the Ultimate Question of Life, the Universe, and Everything +110 just a number +''' + assert TSV(result) == TSV(expected) + + + @pytest.mark.timeout(180) def test_kafka_materialized_view(kafka_cluster): instance.query(''' diff --git a/tests/queries/0_stateless/00825_protobuf_format_input.reference b/tests/queries/0_stateless/00825_protobuf_format_input.reference index 0c56bc4ebf0..75fef3f8ac3 100644 --- a/tests/queries/0_stateless/00825_protobuf_format_input.reference +++ b/tests/queries/0_stateless/00825_protobuf_format_input.reference @@ -8,4 +8,11 @@ a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +7495123456 0 0 2 4 3 9 +a7da1aa6-f425-4789-8947-b034786ed374 Vasily Sidorov male 1995-07-28 bmp +442012345678 1 2018-12-30 00:00:00 23 leo ['Sunny'] [250,244,10] Murmansk [68.970680,33.074982] 3.14159265358979 100000000000.00 800 -3.2 154400000 ['pound'] [16] 503 [] +c694ad8a-f714-4ea3-907d-fd54fb25d9b5 Natalia Sokolova female 1992-03-08 jpg \N 0 \N 26 pisces [] [100,200,50] Plymouth [50.403724,-4.142123] 3.14159 \N 0.007 5.4 -20000000000000 [] [] \N [] +a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +74951234567\0 1 2019-01-05 18:45:00 38 capricorn ['Yesterday','Flowers'] [255,0,0] Moscow [55.753216,37.622504] 3.14 214.10 0.1 5.8 17060000000 ['meter','centimeter','kilometer'] [1,0.01,1000] 500 [501,502] +3faee064-c4f7-4d34-b6f3-8d81c2b6a15d Nick Kolesnikov male 1998-12-26 bmp 412-687-5007\0 1 2018-11-19 05:59:59 20 capricorn ['Havana'] [128,0,128] Pittsburgh [40.517192,-79.949456] 3.1415926535898 50000000000.00 780 18.3 195500007 ['ounce','carat','gram'] [28.35,0.2,1] 9494 [] +2 4 +3 9 +ok ok diff --git a/tests/queries/0_stateless/00825_protobuf_format_input.sh b/tests/queries/0_stateless/00825_protobuf_format_input.sh index 66c92e6fb35..b9912b2b849 100755 --- a/tests/queries/0_stateless/00825_protobuf_format_input.sh +++ b/tests/queries/0_stateless/00825_protobuf_format_input.sh @@ -48,6 +48,14 @@ source "$CURDIR"/00825_protobuf_format_input.insh $CLICKHOUSE_CLIENT --query "SELECT * FROM in_persons_00825 ORDER BY uuid;" $CLICKHOUSE_CLIENT --query "SELECT * FROM in_squares_00825 ORDER BY number;" +$CLICKHOUSE_CLIENT --query "TRUNCATE TABLE in_persons_00825;" +$CLICKHOUSE_CLIENT --query "TRUNCATE TABLE in_squares_00825;" + +source "$CURDIR"/00825_protobuf_format_input_single.insh + +$CLICKHOUSE_CLIENT --query "SELECT * FROM in_persons_00825 ORDER BY uuid;" +$CLICKHOUSE_CLIENT --query "SELECT * FROM in_squares_00825 ORDER BY number;" + # Try to input malformed data. set +eo pipefail echo -ne '\xe0\x80\x3f\x0b' \ @@ -55,5 +63,12 @@ echo -ne '\xe0\x80\x3f\x0b' \ | grep -qF "Protobuf messages are corrupted" && echo "ok" || echo "fail" set -eo pipefail +# Try to input malformed data for ProtobufSingle +set +eo pipefail +echo -ne '\xff\xff\x3f\x0b' \ + | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'" 2>&1 \ + | grep -qF "Protobuf messages are corrupted" && echo "ok" || echo "fail" +set -eo pipefail + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS in_persons_00825;" $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS in_squares_00825;" diff --git a/tests/queries/0_stateless/00825_protobuf_format_input_single.insh b/tests/queries/0_stateless/00825_protobuf_format_input_single.insh new file mode 100644 index 00000000000..6c4dfec05aa --- /dev/null +++ b/tests/queries/0_stateless/00825_protobuf_format_input_single.insh @@ -0,0 +1,12 @@ +echo -ne '\x0a\x24\x61\x37\x35\x32\x32\x31\x35\x38\x2d\x33\x64\x34\x31\x2d\x34\x62\x37\x37\x2d\x61\x64\x36\x39\x2d\x36\x63\x35\x39\x38\x65\x65\x35\x35\x63\x34\x39\x12\x04\x49\x76\x61\x6e\x1a\x06\x50\x65\x74\x72\x6f\x76\x20\x01\x28\xaf\x1f\x32\x03\x70\x6e\x67\x3a\x0c\x2b\x37\x34\x39\x35\x31\x32\x33\x34\x35\x36\x37\x40\x01\x4d\xfc\xd0\x30\x5c\x50\x26\x58\x09\x62\x09\x59\x65\x73\x74\x65\x72\x64\x61\x79\x62\x07\x46\x6c\x6f\x77\x65\x72\x73\x6a\x04\xff\x01\x00\x00\x72\x06\x4d\x6f\x73\x63\x6f\x77\x7a\x08\x4b\x03\x5f\x42\x72\x7d\x16\x42\x81\x01\x1f\x85\xeb\x51\xb8\x1e\x09\x40\x89\x01\x33\x33\x33\x33\x33\xc3\x6a\x40\x95\x01\xcd\xcc\xcc\x3d\x9d\x01\x9a\x99\xb9\x40\xa0\x01\x80\xc4\xd7\x8d\x7f\xaa\x01\x0c\x0a\x05\x6d\x65\x74\x65\x72\x15\x00\x00\x80\x3f\xaa\x01\x11\x0a\x0a\x63\x65\x6e\x74\x69\x6d\x65\x74\x65\x72\x15\x0a\xd7\x23\x3c\xaa\x01\x10\x0a\x09\x6b\x69\x6c\x6f\x6d\x65\x74\x65\x72\x15\x00\x00\x7a\x44\xb2\x01\x10\x0a\x0e\xa2\x06\x0b\x0a\x09\x08\xf4\x03\x12\x04\xf5\x03\xf6\x03' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'" +echo -ne '\x0a\x24\x63\x36\x39\x34\x61\x64\x38\x61\x2d\x66\x37\x31\x34\x2d\x34\x65\x61\x33\x2d\x39\x30\x37\x64\x2d\x66\x64\x35\x34\x66\x62\x32\x35\x64\x39\x62\x35\x12\x07\x4e\x61\x74\x61\x6c\x69\x61\x1a\x08\x53\x6f\x6b\x6f\x6c\x6f\x76\x61\x28\xa6\x3f\x32\x03\x6a\x70\x67\x50\x1a\x58\x0b\x6a\x04\x64\xc8\x01\x32\x72\x08\x50\x6c\x79\x6d\x6f\x75\x74\x68\x7a\x08\x6a\x9d\x49\x42\x46\x8c\x84\xc0\x81\x01\x6e\x86\x1b\xf0\xf9\x21\x09\x40\x95\x01\x42\x60\xe5\x3b\x9d\x01\xcd\xcc\xac\x40\xa0\x01\xff\xff\xa9\xce\x93\x8c\x09' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'" +echo -ne '\x0a\x24\x61\x37\x64\x61\x31\x61\x61\x36\x2d\x66\x34\x32\x35\x2d\x34\x37\x38\x39\x2d\x38\x39\x34\x37\x2d\x62\x30\x33\x34\x37\x38\x36\x65\x64\x33\x37\x34\x12\x06\x56\x61\x73\x69\x6c\x79\x1a\x07\x53\x69\x64\x6f\x72\x6f\x76\x20\x01\x28\xfb\x48\x32\x03\x62\x6d\x70\x3a\x0d\x2b\x34\x34\x32\x30\x31\x32\x33\x34\x35\x36\x37\x38\x40\x01\x4d\x50\xe0\x27\x5c\x50\x17\x58\x04\x62\x05\x53\x75\x6e\x6e\x79\x6a\x05\xfa\x01\xf4\x01\x0a\x72\x08\x4d\x75\x72\x6d\x61\x6e\x73\x6b\x7a\x08\xfd\xf0\x89\x42\xc8\x4c\x04\x42\x81\x01\x11\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x37\x42\x95\x01\x00\x00\x48\x44\x9d\x01\xcd\xcc\x4c\xc0\xa0\x01\x80\xd4\x9f\x93\x01\xaa\x01\x0c\x0a\x05\x70\x6f\x75\x6e\x64\x15\x00\x00\x80\x41\xb2\x01\x0a\x0a\x08\xa2\x06\x05\x0a\x03\x08\xf7\x03' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'" +echo -ne '\x0a\x24\x33\x66\x61\x65\x65\x30\x36\x34\x2d\x63\x34\x66\x37\x2d\x34\x64\x33\x34\x2d\x62\x36\x66\x33\x2d\x38\x64\x38\x31\x63\x32\x62\x36\x61\x31\x35\x64\x12\x04\x4e\x69\x63\x6b\x1a\x0a\x4b\x6f\x6c\x65\x73\x6e\x69\x6b\x6f\x76\x20\x01\x28\xda\x52\x32\x03\x62\x6d\x70\x3a\x0c\x34\x31\x32\x2d\x36\x38\x37\x2d\x35\x30\x30\x37\x40\x01\x4d\x2f\x27\xf2\x5b\x50\x14\x58\x09\x62\x06\x48\x61\x76\x61\x6e\x61\x68\x80\x01\x68\x00\x68\x80\x01\x72\x0a\x50\x69\x74\x74\x73\x62\x75\x72\x67\x68\x7a\x08\x9b\x11\x22\x42\x1f\xe6\x9f\xc2\x81\x01\x28\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x27\x42\x95\x01\x00\x00\x43\x44\x9d\x01\x66\x66\x92\x41\xa0\x01\xce\xdf\xb8\xba\x01\xab\x01\x0d\xcd\xcc\xe2\x41\x0d\xcd\xcc\x4c\x3e\x0d\x00\x00\x80\x3f\x12\x05\x6f\x75\x6e\x63\x65\x12\x05\x63\x61\x72\x61\x74\x12\x04\x67\x72\x61\x6d\xac\x01\xb3\x01\x0b\xa2\x06\x05\x0b\x08\x96\x4a\x0c\x0c\xb4\x01' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2:Syntax2Person'" + +echo -ne '\x08\x02\x10\x04' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'" +echo -ne '\x08\x03\x10\x09' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'" + +### Actually empty Protobuf message is a valid message (with all values default). +### It will work in Kafka but clickhouse-client forbids that: +### Code: 108. DB::Exception: No data to insert +## echo -ne '' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'" \ No newline at end of file diff --git a/tests/queries/0_stateless/00825_protobuf_format_output.reference b/tests/queries/0_stateless/00825_protobuf_format_output.reference index 9d20d778ff6ac82cc23b20eed6ed833bb628fcb8..f0e0ac58a5e323517118e4e2137cd5bcdb5a1feb 100644 GIT binary patch delta 115 zcmZn^?vvfn$SE6>7n7%@r4Ue*Uy`3xniiaym!6ZVprr*8=bCKDo-;A8gU`_?#5Ksz zF~l>>Ro9MdvOCieuHcX$Pe1oy5Z931WO5nHXRhE#zYxa=BZ$yPuHe8>#~@dz3Xm}X DQwt;E delta 7 OcmeB^Z4}'; SELECT * FROM out_squares_00825 ORDER BY number FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'; +SELECT '\n\n** ProtobufSingle **\n\n'; + +SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'; +SELECT 'ALTERNATIVE->'; +SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:AltPerson'; +SELECT 'STRINGS->'; +SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:StrPerson'; +SELECT 'SYNTAX2->'; +SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2:Syntax2Person'; +SELECT 'SQUARES->'; +SELECT * FROM out_squares_00825 ORDER BY number LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'; + +-- Code: 158, e.displayText() = DB::Exception: ProtobufSingle can output only single row at a time. +SELECT * FROM out_persons_00825 ORDER BY name FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'; -- { clientError 158 } + DROP TABLE IF EXISTS out_persons_00825; DROP TABLE IF EXISTS out_squares_00825; EOF