mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
751 lines
24 KiB
Python
751 lines
24 KiB
Python
import os
|
|
import pytest
|
|
import sys
|
|
import time
|
|
import pytz
|
|
import uuid
|
|
import grpc
|
|
from helpers.cluster import ClickHouseCluster, run_and_check
|
|
from threading import Thread
|
|
import gzip
|
|
import lz4.frame
|
|
|
|
script_dir = os.path.dirname(os.path.realpath(__file__))
|
|
pb2_dir = os.path.join(script_dir, "pb2")
|
|
if pb2_dir not in sys.path:
|
|
sys.path.append(pb2_dir)
|
|
import clickhouse_grpc_pb2, clickhouse_grpc_pb2_grpc # Execute pb2/generate.py to generate these modules.
|
|
|
|
|
|
GRPC_PORT = 9100
|
|
DEFAULT_ENCODING = "utf-8"
|
|
|
|
|
|
# Utilities
|
|
|
|
config_dir = os.path.join(script_dir, "./configs")
|
|
cluster = ClickHouseCluster(__file__)
|
|
node = cluster.add_instance(
|
|
"node",
|
|
main_configs=["configs/grpc_config.xml"],
|
|
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
|
|
env_variables={
|
|
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
|
|
},
|
|
)
|
|
main_channel = None
|
|
|
|
|
|
def create_channel():
|
|
node_ip_with_grpc_port = cluster.get_instance_ip("node") + ":" + str(GRPC_PORT)
|
|
channel = grpc.insecure_channel(node_ip_with_grpc_port)
|
|
grpc.channel_ready_future(channel).result(timeout=10)
|
|
global main_channel
|
|
if not main_channel:
|
|
main_channel = channel
|
|
return channel
|
|
|
|
|
|
def query_common(
|
|
query_text,
|
|
settings={},
|
|
input_data=[],
|
|
input_data_delimiter="",
|
|
output_format="TabSeparated",
|
|
send_output_columns=False,
|
|
external_tables=[],
|
|
user_name="",
|
|
password="",
|
|
query_id="123",
|
|
session_id="",
|
|
stream_output=False,
|
|
channel=None,
|
|
):
|
|
if type(input_data) is not list:
|
|
input_data = [input_data]
|
|
if type(input_data_delimiter) is str:
|
|
input_data_delimiter = input_data_delimiter.encode(DEFAULT_ENCODING)
|
|
if not channel:
|
|
channel = main_channel
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
|
|
|
|
def query_info():
|
|
input_data_part = input_data.pop(0) if input_data else b""
|
|
if type(input_data_part) is str:
|
|
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
|
|
return clickhouse_grpc_pb2.QueryInfo(
|
|
query=query_text,
|
|
settings=settings,
|
|
input_data=input_data_part,
|
|
input_data_delimiter=input_data_delimiter,
|
|
output_format=output_format,
|
|
send_output_columns=send_output_columns,
|
|
external_tables=external_tables,
|
|
user_name=user_name,
|
|
password=password,
|
|
query_id=query_id,
|
|
session_id=session_id,
|
|
next_query_info=bool(input_data),
|
|
)
|
|
|
|
def send_query_info():
|
|
yield query_info()
|
|
while input_data:
|
|
input_data_part = input_data.pop(0)
|
|
if type(input_data_part) is str:
|
|
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
input_data=input_data_part, next_query_info=bool(input_data)
|
|
)
|
|
|
|
stream_input = len(input_data) > 1
|
|
if stream_input and stream_output:
|
|
return list(stub.ExecuteQueryWithStreamIO(send_query_info()))
|
|
elif stream_input:
|
|
return [stub.ExecuteQueryWithStreamInput(send_query_info())]
|
|
elif stream_output:
|
|
return list(stub.ExecuteQueryWithStreamOutput(query_info()))
|
|
else:
|
|
return [stub.ExecuteQuery(query_info())]
|
|
|
|
|
|
def query_no_errors(*args, **kwargs):
|
|
results = query_common(*args, **kwargs)
|
|
if results and results[-1].HasField("exception"):
|
|
raise Exception(results[-1].exception.display_text)
|
|
return results
|
|
|
|
|
|
def query(*args, **kwargs):
|
|
output = b""
|
|
for result in query_no_errors(*args, **kwargs):
|
|
output += result.output
|
|
return output.decode(DEFAULT_ENCODING)
|
|
|
|
|
|
def query_and_get_error(*args, **kwargs):
|
|
results = query_common(*args, **kwargs)
|
|
if not results or not results[-1].HasField("exception"):
|
|
raise Exception("Expected to be failed but succeeded!")
|
|
return results[-1].exception
|
|
|
|
|
|
def query_and_get_totals(*args, **kwargs):
|
|
totals = b""
|
|
for result in query_no_errors(*args, **kwargs):
|
|
totals += result.totals
|
|
return totals.decode(DEFAULT_ENCODING)
|
|
|
|
|
|
def query_and_get_extremes(*args, **kwargs):
|
|
extremes = b""
|
|
for result in query_no_errors(*args, **kwargs):
|
|
extremes += result.extremes
|
|
return extremes.decode(DEFAULT_ENCODING)
|
|
|
|
|
|
def query_and_get_logs(*args, **kwargs):
|
|
logs = ""
|
|
for result in query_no_errors(*args, **kwargs):
|
|
for log_entry in result.logs:
|
|
# print(log_entry)
|
|
logs += log_entry.text + "\n"
|
|
return logs
|
|
|
|
|
|
class QueryThread(Thread):
|
|
def __init__(
|
|
self, query_text, expected_output, query_id, use_separate_channel=False
|
|
):
|
|
Thread.__init__(self)
|
|
self.query_text = query_text
|
|
self.expected_output = expected_output
|
|
self.use_separate_channel = use_separate_channel
|
|
self.query_id = query_id
|
|
|
|
def run(self):
|
|
if self.use_separate_channel:
|
|
with create_channel() as channel:
|
|
assert (
|
|
query(self.query_text, query_id=self.query_id, channel=channel)
|
|
== self.expected_output
|
|
)
|
|
else:
|
|
assert (
|
|
query(self.query_text, query_id=self.query_id) == self.expected_output
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
cluster.start()
|
|
try:
|
|
with create_channel() as channel:
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_after_test():
|
|
yield
|
|
node.query_with_retry("DROP TABLE IF EXISTS t")
|
|
|
|
|
|
# Actual tests
|
|
|
|
|
|
def test_select_one():
|
|
assert query("SELECT 1") == "1\n"
|
|
|
|
|
|
def test_ordinary_query():
|
|
assert query("SELECT count() FROM numbers(100)") == "100\n"
|
|
|
|
|
|
def test_insert_query():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t VALUES (1),(2),(3)")
|
|
query("INSERT INTO t FORMAT TabSeparated 4\n5\n6\n")
|
|
query("INSERT INTO t VALUES", input_data="(7),(8)")
|
|
query("INSERT INTO t FORMAT TabSeparated", input_data="9\n10\n")
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n"
|
|
|
|
|
|
def test_insert_query_streaming():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query(
|
|
"INSERT INTO t VALUES",
|
|
input_data=["(1),(2),(3),", "(5),(4),(6),", "(7),(8),(9)"],
|
|
)
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n7\n8\n9\n"
|
|
|
|
|
|
def test_insert_query_delimiter():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query(
|
|
"INSERT INTO t FORMAT CSV 1\n2",
|
|
input_data=["3", "4\n5"],
|
|
input_data_delimiter="\n",
|
|
)
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n"
|
|
query("DROP TABLE t")
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t FORMAT CSV 1\n2", input_data=["3", "4\n5"])
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n5\n234\n"
|
|
|
|
|
|
def test_insert_default_column():
|
|
query(
|
|
"CREATE TABLE t (a UInt8, b Int32 DEFAULT 100 - a, c String DEFAULT 'c') ENGINE = Memory"
|
|
)
|
|
query("INSERT INTO t (c, a) VALUES ('x',1),('y',2)")
|
|
query("INSERT INTO t (a) FORMAT TabSeparated", input_data="3\n4\n")
|
|
assert (
|
|
query("SELECT * FROM t ORDER BY a") == "1\t99\tx\n"
|
|
"2\t98\ty\n"
|
|
"3\t97\tc\n"
|
|
"4\t96\tc\n"
|
|
)
|
|
|
|
|
|
def test_insert_splitted_row():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t VALUES", input_data=["(1),(2),(", "3),(5),(4),(6)"])
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n"
|
|
|
|
|
|
def test_output_format():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t VALUES (1),(2),(3)")
|
|
assert (
|
|
query("SELECT a FROM t ORDER BY a FORMAT JSONEachRow")
|
|
== '{"a":1}\n{"a":2}\n{"a":3}\n'
|
|
)
|
|
assert (
|
|
query("SELECT a FROM t ORDER BY a", output_format="JSONEachRow")
|
|
== '{"a":1}\n{"a":2}\n{"a":3}\n'
|
|
)
|
|
|
|
|
|
def test_totals_and_extremes():
|
|
query("CREATE TABLE t (x UInt8, y UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4)")
|
|
assert (
|
|
query("SELECT sum(x), y FROM t GROUP BY y WITH TOTALS") == "4\t2\n3\t3\n5\t4\n"
|
|
)
|
|
assert (
|
|
query_and_get_totals("SELECT sum(x), y FROM t GROUP BY y WITH TOTALS")
|
|
== "12\t0\n"
|
|
)
|
|
assert query("SELECT x, y FROM t") == "1\t2\n2\t4\n3\t2\n3\t3\n3\t4\n"
|
|
assert (
|
|
query_and_get_extremes("SELECT x, y FROM t", settings={"extremes": "1"})
|
|
== "1\t2\n3\t4\n"
|
|
)
|
|
|
|
|
|
def test_get_query_details():
|
|
result = list(
|
|
query_no_errors("CREATE TABLE t (a UInt8) ENGINE = Memory", query_id="123")
|
|
)[0]
|
|
assert result.query_id == "123"
|
|
pytz.timezone(result.time_zone)
|
|
assert result.output_format == ""
|
|
assert len(result.output_columns) == 0
|
|
assert result.output == b""
|
|
#
|
|
result = list(
|
|
query_no_errors("SELECT 'a', 1", query_id="", output_format="TabSeparated")
|
|
)[0]
|
|
uuid.UUID(result.query_id)
|
|
pytz.timezone(result.time_zone)
|
|
assert result.output_format == "TabSeparated"
|
|
assert len(result.output_columns) == 0
|
|
assert result.output == b"a\t1\n"
|
|
#
|
|
result = list(
|
|
query_no_errors(
|
|
"SELECT 'a' AS x, 1 FORMAT JSONEachRow",
|
|
query_id="",
|
|
send_output_columns=True,
|
|
)
|
|
)[0]
|
|
uuid.UUID(result.query_id)
|
|
pytz.timezone(result.time_zone)
|
|
assert result.output_format == "JSONEachRow"
|
|
assert ([(col.name, col.type) for col in result.output_columns]) == [
|
|
("x", "String"),
|
|
("1", "UInt8"),
|
|
]
|
|
assert result.output == b'{"x":"a","1":1}\n'
|
|
|
|
|
|
def test_errors_handling():
|
|
e = query_and_get_error("")
|
|
# print(e)
|
|
assert "Empty query" in e.display_text
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
e = query_and_get_error("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
assert "Table default.t already exists" in e.display_text
|
|
|
|
|
|
def test_authentication():
|
|
query("CREATE USER OR REPLACE john IDENTIFIED BY 'qwe123'")
|
|
assert (
|
|
query("SELECT currentUser()", user_name="john", password="qwe123") == "john\n"
|
|
)
|
|
query("DROP USER john")
|
|
|
|
|
|
def test_logs():
|
|
query = "SELECT has(groupArray(number), 42) FROM numbers(1000000) SETTINGS max_block_size=100000"
|
|
logs = query_and_get_logs(
|
|
query,
|
|
settings={"send_logs_level": "debug"},
|
|
)
|
|
assert query in logs
|
|
assert "Read 1000000 rows" in logs
|
|
assert "Peak memory usage" in logs
|
|
|
|
|
|
def test_progress():
|
|
results = query_no_errors(
|
|
"SELECT number, sleep(0.31) FROM numbers(8) SETTINGS max_block_size=2, interactive_delay=100000",
|
|
stream_output=True,
|
|
)
|
|
results = list(results)
|
|
for result in results:
|
|
result.time_zone = ""
|
|
result.query_id = ""
|
|
# print(results)
|
|
|
|
# Note: We can't convert those messages to string like `results = str(results)` and then compare it as a string
|
|
# because str() can serialize a protobuf message with any order of fields.
|
|
expected_results = [
|
|
clickhouse_grpc_pb2.Result(
|
|
output_format="TabSeparated",
|
|
progress=clickhouse_grpc_pb2.Progress(
|
|
read_rows=2, read_bytes=16, total_rows_to_read=8
|
|
),
|
|
),
|
|
clickhouse_grpc_pb2.Result(output=b"0\t0\n1\t0\n"),
|
|
clickhouse_grpc_pb2.Result(
|
|
progress=clickhouse_grpc_pb2.Progress(read_rows=2, read_bytes=16)
|
|
),
|
|
clickhouse_grpc_pb2.Result(output=b"2\t0\n3\t0\n"),
|
|
clickhouse_grpc_pb2.Result(
|
|
progress=clickhouse_grpc_pb2.Progress(read_rows=2, read_bytes=16)
|
|
),
|
|
clickhouse_grpc_pb2.Result(output=b"4\t0\n5\t0\n"),
|
|
clickhouse_grpc_pb2.Result(
|
|
progress=clickhouse_grpc_pb2.Progress(read_rows=2, read_bytes=16)
|
|
),
|
|
clickhouse_grpc_pb2.Result(output=b"6\t0\n7\t0\n"),
|
|
clickhouse_grpc_pb2.Result(
|
|
stats=clickhouse_grpc_pb2.Stats(
|
|
rows=8,
|
|
blocks=4,
|
|
allocated_bytes=1092,
|
|
applied_limit=True,
|
|
rows_before_limit=8,
|
|
)
|
|
),
|
|
]
|
|
|
|
assert results == expected_results
|
|
|
|
|
|
def test_session_settings():
|
|
session_a = "session A"
|
|
session_b = "session B"
|
|
query("SET custom_x=1", session_id=session_a)
|
|
query("SET custom_y=2", session_id=session_a)
|
|
query("SET custom_x=3", session_id=session_b)
|
|
query("SET custom_y=4", session_id=session_b)
|
|
assert (
|
|
query(
|
|
"SELECT getSetting('custom_x'), getSetting('custom_y')",
|
|
session_id=session_a,
|
|
)
|
|
== "1\t2\n"
|
|
)
|
|
assert (
|
|
query(
|
|
"SELECT getSetting('custom_x'), getSetting('custom_y')",
|
|
session_id=session_b,
|
|
)
|
|
== "3\t4\n"
|
|
)
|
|
|
|
|
|
def test_session_temp_tables():
|
|
session_a = "session A"
|
|
session_b = "session B"
|
|
query("CREATE TEMPORARY TABLE my_temp_table(a Int8)", session_id=session_a)
|
|
query("INSERT INTO my_temp_table VALUES (10)", session_id=session_a)
|
|
assert query("SELECT * FROM my_temp_table", session_id=session_a) == "10\n"
|
|
query("CREATE TEMPORARY TABLE my_temp_table(a Int8)", session_id=session_b)
|
|
query("INSERT INTO my_temp_table VALUES (20)", session_id=session_b)
|
|
assert query("SELECT * FROM my_temp_table", session_id=session_b) == "20\n"
|
|
assert query("SELECT * FROM my_temp_table", session_id=session_a) == "10\n"
|
|
|
|
|
|
def test_no_session():
|
|
e = query_and_get_error("SET custom_x=1")
|
|
assert "There is no session" in e.display_text
|
|
e = query_and_get_error("CREATE TEMPORARY TABLE my_temp_table(a Int8)")
|
|
assert "There is no session" in e.display_text
|
|
|
|
|
|
def test_input_function():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
query(
|
|
"INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV",
|
|
input_data=["5,4\n", "8,11\n", "10,12\n"],
|
|
)
|
|
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n"
|
|
query(
|
|
"INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV 11,13"
|
|
)
|
|
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n143\n"
|
|
query(
|
|
"INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV 20,10\n",
|
|
input_data="15,15\n",
|
|
)
|
|
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n143\n200\n225\n"
|
|
|
|
|
|
def test_external_table():
|
|
columns = [
|
|
clickhouse_grpc_pb2.NameAndType(name="UserID", type="UInt64"),
|
|
clickhouse_grpc_pb2.NameAndType(name="UserName", type="String"),
|
|
]
|
|
ext1 = clickhouse_grpc_pb2.ExternalTable(
|
|
name="ext1",
|
|
columns=columns,
|
|
data=b"1\tAlex\n2\tBen\n3\tCarl\n",
|
|
format="TabSeparated",
|
|
)
|
|
assert (
|
|
query("SELECT * FROM ext1 ORDER BY UserID", external_tables=[ext1])
|
|
== "1\tAlex\n"
|
|
"2\tBen\n"
|
|
"3\tCarl\n"
|
|
)
|
|
ext2 = clickhouse_grpc_pb2.ExternalTable(
|
|
name="ext2", columns=columns, data=b"4,Daniel\n5,Ethan\n", format="CSV"
|
|
)
|
|
assert (
|
|
query(
|
|
"SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID",
|
|
external_tables=[ext1, ext2],
|
|
)
|
|
== "1\tAlex\n"
|
|
"2\tBen\n"
|
|
"3\tCarl\n"
|
|
"4\tDaniel\n"
|
|
"5\tEthan\n"
|
|
)
|
|
unnamed_columns = [
|
|
clickhouse_grpc_pb2.NameAndType(type="UInt64"),
|
|
clickhouse_grpc_pb2.NameAndType(type="String"),
|
|
]
|
|
unnamed_table = clickhouse_grpc_pb2.ExternalTable(
|
|
columns=unnamed_columns, data=b"6\tGeorge\n7\tFred\n"
|
|
)
|
|
assert (
|
|
query("SELECT * FROM _data ORDER BY _2", external_tables=[unnamed_table])
|
|
== "7\tFred\n"
|
|
"6\tGeorge\n"
|
|
)
|
|
|
|
|
|
def test_external_table_streaming():
|
|
columns = [
|
|
clickhouse_grpc_pb2.NameAndType(name="UserID", type="UInt64"),
|
|
clickhouse_grpc_pb2.NameAndType(name="UserName", type="String"),
|
|
]
|
|
|
|
def send_query_info():
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT * FROM exts ORDER BY UserID",
|
|
external_tables=[
|
|
clickhouse_grpc_pb2.ExternalTable(
|
|
name="exts", columns=columns, data=b"1\tAlex\n2\tBen\n3\tCarl\n"
|
|
)
|
|
],
|
|
next_query_info=True,
|
|
)
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
external_tables=[
|
|
clickhouse_grpc_pb2.ExternalTable(
|
|
name="exts", data=b"4\tDaniel\n5\tEthan\n"
|
|
)
|
|
]
|
|
)
|
|
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
result = stub.ExecuteQueryWithStreamInput(send_query_info())
|
|
assert (
|
|
result.output == b"1\tAlex\n"
|
|
b"2\tBen\n"
|
|
b"3\tCarl\n"
|
|
b"4\tDaniel\n"
|
|
b"5\tEthan\n"
|
|
)
|
|
|
|
|
|
def test_simultaneous_queries_same_channel():
|
|
threads = []
|
|
try:
|
|
for i in range(0, 100):
|
|
thread = QueryThread(
|
|
"SELECT sum(number) FROM numbers(10)",
|
|
expected_output="45\n",
|
|
query_id="sqA" + str(i),
|
|
)
|
|
threads.append(thread)
|
|
thread.start()
|
|
finally:
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
|
|
def test_simultaneous_queries_multiple_channels():
|
|
threads = []
|
|
try:
|
|
for i in range(0, 100):
|
|
thread = QueryThread(
|
|
"SELECT sum(number) FROM numbers(10)",
|
|
expected_output="45\n",
|
|
query_id="sqB" + str(i),
|
|
use_separate_channel=True,
|
|
)
|
|
threads.append(thread)
|
|
thread.start()
|
|
finally:
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
|
|
def test_cancel_while_processing_input():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
|
|
def send_query_info():
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
query="INSERT INTO t FORMAT TabSeparated",
|
|
input_data=b"1\n2\n3\n",
|
|
next_query_info=True,
|
|
)
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
input_data=b"4\n5\n6\n", next_query_info=True
|
|
)
|
|
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
|
|
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
result = stub.ExecuteQueryWithStreamInput(send_query_info())
|
|
assert result.cancelled == True
|
|
|
|
|
|
def test_cancel_while_generating_output():
|
|
def send_query_info():
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT number, sleep(0.2) FROM numbers(10) SETTINGS max_block_size=2"
|
|
)
|
|
time.sleep(0.5)
|
|
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
|
|
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
results = list(stub.ExecuteQueryWithStreamIO(send_query_info()))
|
|
assert len(results) >= 1
|
|
assert results[-1].cancelled == True
|
|
output = b""
|
|
for result in results:
|
|
output += result.output
|
|
assert output == b"0\t0\n1\t0\n2\t0\n3\t0\n"
|
|
|
|
|
|
def test_compressed_output():
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT 0 FROM numbers(1000)", output_compression_type="lz4"
|
|
)
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert lz4.frame.decompress(result.output) == (b"0\n") * 1000
|
|
|
|
|
|
def test_compressed_output_streaming():
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT 0 FROM numbers(100000)", output_compression_type="lz4"
|
|
)
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
d_context = lz4.frame.create_decompression_context()
|
|
data = b""
|
|
for result in stub.ExecuteQueryWithStreamOutput(query_info):
|
|
d1, _, _ = lz4.frame.decompress_chunk(d_context, result.output)
|
|
data += d1
|
|
assert data == (b"0\n") * 100000
|
|
|
|
|
|
def test_compressed_output_gzip():
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT 0 FROM numbers(1000)",
|
|
output_compression_type="gzip",
|
|
output_compression_level=6,
|
|
)
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert gzip.decompress(result.output) == (b"0\n") * 1000
|
|
|
|
|
|
def test_compressed_totals_and_extremes():
|
|
query("CREATE TABLE t (x UInt8, y UInt8) ENGINE = Memory")
|
|
query("INSERT INTO t VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4)")
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT sum(x), y FROM t GROUP BY y WITH TOTALS",
|
|
output_compression_type="lz4",
|
|
)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert lz4.frame.decompress(result.totals) == b"12\t0\n"
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT x, y FROM t",
|
|
settings={"extremes": "1"},
|
|
output_compression_type="lz4",
|
|
)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert lz4.frame.decompress(result.extremes) == b"1\t2\n3\t4\n"
|
|
|
|
|
|
def test_compressed_insert_query_streaming():
|
|
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
|
data = lz4.frame.compress(b"(1),(2),(3),(5),(4),(6),(7),(8),(9)")
|
|
sz1 = len(data) // 3
|
|
sz2 = len(data) // 3
|
|
d1 = data[:sz1]
|
|
d2 = data[sz1 : sz1 + sz2]
|
|
d3 = data[sz1 + sz2 :]
|
|
|
|
def send_query_info():
|
|
yield clickhouse_grpc_pb2.QueryInfo(
|
|
query="INSERT INTO t VALUES",
|
|
input_data=d1,
|
|
input_compression_type="lz4",
|
|
next_query_info=True,
|
|
)
|
|
yield clickhouse_grpc_pb2.QueryInfo(input_data=d2, next_query_info=True)
|
|
yield clickhouse_grpc_pb2.QueryInfo(input_data=d3)
|
|
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
stub.ExecuteQueryWithStreamInput(send_query_info())
|
|
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n7\n8\n9\n"
|
|
|
|
|
|
def test_compressed_external_table():
|
|
columns = [
|
|
clickhouse_grpc_pb2.NameAndType(name="UserID", type="UInt64"),
|
|
clickhouse_grpc_pb2.NameAndType(name="UserName", type="String"),
|
|
]
|
|
d1 = lz4.frame.compress(b"1\tAlex\n2\tBen\n3\tCarl\n")
|
|
d2 = gzip.compress(b"4,Daniel\n5,Ethan\n")
|
|
ext1 = clickhouse_grpc_pb2.ExternalTable(
|
|
name="ext1",
|
|
columns=columns,
|
|
data=d1,
|
|
format="TabSeparated",
|
|
compression_type="lz4",
|
|
)
|
|
ext2 = clickhouse_grpc_pb2.ExternalTable(
|
|
name="ext2", columns=columns, data=d2, format="CSV", compression_type="gzip"
|
|
)
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID",
|
|
external_tables=[ext1, ext2],
|
|
)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert (
|
|
result.output == b"1\tAlex\n"
|
|
b"2\tBen\n"
|
|
b"3\tCarl\n"
|
|
b"4\tDaniel\n"
|
|
b"5\tEthan\n"
|
|
)
|
|
|
|
|
|
def test_transport_compression():
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(
|
|
query="SELECT 0 FROM numbers(1000000)",
|
|
transport_compression_type="gzip",
|
|
transport_compression_level=3,
|
|
)
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
result = stub.ExecuteQuery(query_info)
|
|
assert result.output == (b"0\n") * 1000000
|
|
|
|
|
|
def test_opentelemetry_context_propagation():
|
|
trace_id = "80c190b5-9dc1-4eae-82b9-6c261438c817"
|
|
parent_span_id = 123
|
|
trace_state = "some custom state"
|
|
trace_id_hex = trace_id.replace("-", "")
|
|
parent_span_id_hex = f"{parent_span_id:0>16X}"
|
|
metadata = [
|
|
("traceparent", f"00-{trace_id_hex}-{parent_span_id_hex}-01"),
|
|
("tracestate", trace_state),
|
|
]
|
|
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
|
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 1")
|
|
result = stub.ExecuteQuery(query_info, metadata=metadata)
|
|
assert result.output == b"1\n"
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
assert (
|
|
node.query(
|
|
f"SELECT attribute['db.statement'], attribute['clickhouse.tracestate'] FROM system.opentelemetry_span_log "
|
|
f"WHERE trace_id='{trace_id}' AND operation_name='query'"
|
|
)
|
|
== "SELECT 1\tsome custom state\n"
|
|
)
|