ClickHouse/tests/integration/test_format_schema_on_server/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

178 lines
5.4 KiB
Python
Raw Normal View History

import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance", clickhouse_path_dir="clickhouse_path")
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query("CREATE DATABASE test")
yield cluster
finally:
cluster.shutdown()
def create_simple_table():
instance.query("DROP TABLE IF EXISTS test.simple")
instance.query(
"""
CREATE TABLE test.simple (key UInt64, value String)
ENGINE = MergeTree ORDER BY tuple();
"""
)
def test_protobuf_format_input(started_cluster):
create_simple_table()
instance.http_query(
"INSERT INTO test.simple SETTINGS format_schema='simple:KeyValuePair' FORMAT Protobuf",
"\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def",
)
assert instance.query("SELECT * from test.simple") == "1\tabc\n2\tdef\n"
def test_protobuf_format_output(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')")
assert (
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='simple:KeyValuePair'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
def test_drop_cache_protobuf_format(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')")
schema = """
syntax = "proto3";
message MessageTmp {
uint64 key = 1;
string value = 2;
}
"""
protobuf_schema_path_name = "message_tmp.proto"
database_path = os.path.abspath(os.path.join(instance.path, "database"))
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(schema)
assert (
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Replace simple.proto with a new Protobuf schema
new_schema = """
syntax = "proto3";
message MessageTmp {
uint64 key2 = 1;
string value2 = 2;
}
"""
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(new_schema)
instance.query("DROP TABLE IF EXISTS test.new_simple")
instance.query(
"""
CREATE TABLE test.new_simple (key2 UInt64, value2 String)
ENGINE = MergeTree ORDER BY tuple();
"""
)
instance.query("INSERT INTO test.new_simple VALUES (1, 'abc'), (2, 'def')")
instance.query("SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf")
# Tets works with new scheme
assert (
instance.http_query(
"SELECT * FROM test.new_simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Tests that stop working with old scheme
with pytest.raises(Exception) as exc:
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
assert "NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS)" in str(exc.value)
def test_drop_capn_proto_format(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')")
capn_proto_schema = """
@0x801f030c2b67bf19;
struct MessageTmp {
key @0 :UInt64;
value @1 :Text;
}
"""
capn_schema_path_name = "message_tmp.capnp"
database_path = os.path.abspath(os.path.join(instance.path, "database"))
format_schemas_path = os.path.join(database_path, "format_schemas")
with open(os.path.join(format_schemas_path, capn_schema_path_name), "w") as file:
file.write(capn_proto_schema)
assert instance.http_query(
"SELECT * FROM test.simple FORMAT CapnProto SETTINGS format_schema='message_tmp:MessageTmp'"
) == instance.query(
f"SELECT * FROM test.simple Format CapnProto SETTINGS format_schema='{format_schemas_path}/message_tmp:MessageTmp'"
)
new_schema = """
@0x801f030c2b67bf19;
struct MessageTmp {
key2 @0 :UInt64;
value2 @1 :Text;
}
"""
with open(os.path.join(format_schemas_path, capn_schema_path_name), "w") as file:
file.write(new_schema)
instance.query("DROP TABLE IF EXISTS test.new_simple")
instance.query(
"""
CREATE TABLE test.new_simple (key2 UInt64, value2 String)
ENGINE = MergeTree ORDER BY tuple();
"""
)
instance.query("INSERT INTO test.new_simple VALUES (1, 'abc'), (2, 'def')")
# instance.query("SYSTEM DROP FORMAT SCHEMA CACHE FOR CapnProto")
# Tets works with new scheme
assert instance.http_query(
"SELECT * FROM test.new_simple FORMAT CapnProto SETTINGS format_schema='message_tmp:MessageTmp'"
) == instance.query(
f"SELECT * FROM test.new_simple Format CapnProto SETTINGS format_schema='{format_schemas_path}/message_tmp:MessageTmp'"
)
# Tests that stop working with old scheme
with pytest.raises(Exception) as exc:
instance.http_query(
"SELECT * FROM test.simple FORMAT CapnProto SETTINGS format_schema='message_tmp:MessageTmp'"
)
assert (
"Capnproto schema doesn't contain field with name key. (THERE_IS_NO_COLUMN)"
in str(exc.value)
)