Obey Python's quirky formatter

This commit is contained in:
Alexey Milovidov 2024-01-15 06:12:58 +01:00
parent e49cfbef08
commit 678a32cede
15 changed files with 433 additions and 318 deletions

View File

@ -305,14 +305,11 @@ class ClickhouseIntegrationTestsRunner:
def _pre_pull_images(self, repo_path):
image_cmd = self._get_runner_image_cmd(repo_path)
cmd = (
"cd {repo_path}/tests/integration && "
"timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} --pre-pull --command '{command}' ".format(
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
command=r""" echo Pre Pull finished """,
)
cmd = "cd {repo_path}/tests/integration && " "timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} --pre-pull --command '{command}' ".format(
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
command=r""" echo Pre Pull finished """,
)
for i in range(5):

View File

@ -43,7 +43,7 @@ def test_memory_usage():
response = node.get_query_request(
"SELECT groupArray(number) FROM numbers(1000000) SETTINGS max_memory_usage_for_user={}".format(
30 * (2**23)
30 * (2 ** 23)
),
user="A",
)

View File

@ -95,15 +95,25 @@ def test_check_normal_table_corruption(started_cluster, merge_tree_settings):
node1, "non_replicated_mt", "201902_1_1_0", database="default"
)
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
assert (
node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
.strip()
.split("\t")[0:2]
== ["201902_1_1_0", "0"]
)
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
assert (
node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
.strip()
.split("\t")[0:2]
== ["201902_1_1_0", "0"]
)
node1.query(
"INSERT INTO non_replicated_mt VALUES (toDate('2019-01-01'), 1, 10), (toDate('2019-01-01'), 2, 12)"
@ -123,10 +133,15 @@ def test_check_normal_table_corruption(started_cluster, merge_tree_settings):
remove_checksums_on_disk(node1, "default", "non_replicated_mt", "201901_2_2_0")
assert node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201901_2_2_0", "0"]
assert (
node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
.strip()
.split("\t")[0:2]
== ["201901_2_2_0", "0"]
)
@pytest.mark.parametrize("merge_tree_settings, zk_path_suffix", [("", "_0")])
@ -194,12 +209,15 @@ def test_check_replicated_table_simple(
== "201901_0_0_0\t1\t\n"
)
assert sorted(
node2.query(
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0},
).split("\n")
) == ["", "201901_0_0_0\t1\t", "201902_0_0_0\t1\t"]
assert (
sorted(
node2.query(
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0},
).split("\n")
)
== ["", "201901_0_0_0\t1\t", "201902_0_0_0\t1\t"]
)
with pytest.raises(QueryRuntimeException) as exc:
node2.query(
@ -273,10 +291,13 @@ def test_check_replicated_table_corruption(
)
node1.query_with_retry("SYSTEM SYNC REPLICA replicated_mt_1")
assert node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{}\t1\t\n".format(part_name)
assert (
node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "{}\t1\t\n".format(part_name)
)
assert node1.query("SELECT count() from replicated_mt_1") == "4\n"
remove_part_from_disk(node2, "replicated_mt_1", part_name)
@ -288,10 +309,13 @@ def test_check_replicated_table_corruption(
)
node1.query("SYSTEM SYNC REPLICA replicated_mt_1")
assert node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{}\t1\t\n".format(part_name)
assert (
node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "{}\t1\t\n".format(part_name)
)
assert node1.query("SELECT count() from replicated_mt_1") == "4\n"

View File

@ -61,7 +61,7 @@ def check_on_cluster(
print(f"Retry {retry}/{retries} unsuccessful, result: {node_results}")
if retry != retries:
time.sleep(2**retry)
time.sleep(2 ** retry)
else:
msg = msg or f"Wrong '{what}' result"
raise Exception(

View File

@ -76,11 +76,14 @@ def test_role_mapping(ldap_cluster):
"select currentUser()", user="johndoe", password="qwertz"
) == TSV([["johndoe"]])
assert instance.query(
"select role_name from system.current_roles ORDER BY role_name",
user="johndoe",
password="qwertz",
) == TSV([["role_1"], ["role_2"]])
assert (
instance.query(
"select role_name from system.current_roles ORDER BY role_name",
user="johndoe",
password="qwertz",
)
== TSV([["role_1"], ["role_2"]])
)
instance.query("CREATE ROLE role_3")
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_3", member_cn="johndoe")
@ -88,8 +91,11 @@ def test_role_mapping(ldap_cluster):
# See https://github.com/ClickHouse/ClickHouse/issues/54318
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_4", member_cn="johndoe")
assert instance.query(
"select role_name from system.current_roles ORDER BY role_name",
user="johndoe",
password="qwertz",
) == TSV([["role_1"], ["role_2"], ["role_3"]])
assert (
instance.query(
"select role_name from system.current_roles ORDER BY role_name",
user="johndoe",
password="qwertz",
)
== TSV([["role_1"], ["role_2"], ["role_3"]])
)

View File

@ -854,14 +854,14 @@ def test_types(started_cluster):
result = cursor.fetchall()[0]
expected = [
("Int8_column", -(2**7)),
("UInt8_column", 2**8 - 1),
("Int16_column", -(2**15)),
("UInt16_column", 2**16 - 1),
("Int32_column", -(2**31)),
("UInt32_column", 2**32 - 1),
("Int64_column", -(2**63)),
("UInt64_column", 2**64 - 1),
("Int8_column", -(2 ** 7)),
("UInt8_column", 2 ** 8 - 1),
("Int16_column", -(2 ** 15)),
("UInt16_column", 2 ** 16 - 1),
("Int32_column", -(2 ** 31)),
("UInt32_column", 2 ** 32 - 1),
("Int64_column", -(2 ** 63)),
("UInt64_column", 2 ** 64 - 1),
("String_column", "тест"),
("FixedString_column", "тест"),
("Float32_column", 1.5),

View File

@ -561,9 +561,7 @@ def test_make_clone_in_detached(started_cluster):
["cp", "-r", path + "all_0_0_0", path + "detached/broken_all_0_0_0"]
)
assert_eq_with_retry(instance, "select * from clone_in_detached", "\n")
assert [
"broken_all_0_0_0",
] == sorted(
assert ["broken_all_0_0_0",] == sorted(
instance.exec_in_container(["ls", path + "detached/"]).strip().split("\n")
)

View File

@ -506,12 +506,9 @@ def test_alters_from_different_replicas(started_cluster):
dummy_node.stop_clickhouse(kill=True)
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently executing the task"
in competing_node.query_and_get_error(
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
settings=settings,
)
assert "There are 1 unfinished hosts (0 of them are currently executing the task" in competing_node.query_and_get_error(
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
settings=settings,
)
settings = {
"distributed_ddl_task_timeout": 5,

View File

@ -95,12 +95,9 @@ def test_cluster_groups(started_cluster):
# Exception
main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently executing the task)"
in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,
)
assert "There are 1 unfinished hosts (0 of them are currently executing the task)" in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,
)
# 3. After start both groups are synced

View File

@ -9,7 +9,7 @@ import time
def gen_n_digit_number(n):
assert 0 < n < 19
return random.randint(10 ** (n - 1), 10**n - 1)
return random.randint(10 ** (n - 1), 10 ** n - 1)
sum_in_4_column = 0

View File

@ -553,16 +553,13 @@ def test_multipart(started_cluster, maybe_auth, positive):
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
# select uploaded data from many threads
select_query = (
"select sum(column1), sum(column2), sum(column3) "
"from s3('http://{host}:{port}/{bucket}/{filename}', {auth}'CSV', '{table_format}')".format(
host=started_cluster.minio_redirect_host,
port=started_cluster.minio_redirect_port,
bucket=bucket,
filename=filename,
auth=maybe_auth,
table_format=table_format,
)
select_query = "select sum(column1), sum(column2), sum(column3) " "from s3('http://{host}:{port}/{bucket}/{filename}', {auth}'CSV', '{table_format}')".format(
host=started_cluster.minio_redirect_host,
port=started_cluster.minio_redirect_port,
bucket=bucket,
filename=filename,
auth=maybe_auth,
table_format=table_format,
)
try:
select_result = run_query(

View File

@ -79,15 +79,21 @@ def test_table_function_url_access_rights():
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV')", user="u1"
)
assert node1.query(
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
assert (
node1.query(
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
)
== TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
)
assert node1.query(
f"DESCRIBE TABLE url('http://nginx:80/not-exist', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
assert (
node1.query(
f"DESCRIBE TABLE url('http://nginx:80/not-exist', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
user="u1",
)
== TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
)
expected_error = "necessary to have the grant URL ON *.*"
assert expected_error in node1.query_and_get_error(

View File

@ -204,36 +204,33 @@ def test_mutation_simple(started_cluster, replicated):
sleep_time=0.1,
)
assert (
split_tsv(
node_check.query(
"""
assert split_tsv(
node_check.query(
"""
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(
name=table_name
)
name=table_name
)
)
== [
[
db_name,
table_name,
"1",
"['{}']".format(part),
"['{clickhouse}/{table_path}/{}/']".format(
part, clickhouse=clickhouse_path, table_path=table_path
),
result_part,
"{clickhouse}/{table_path}/{}/".format(
result_part, clickhouse=clickhouse_path, table_path=table_path
),
"all",
"1",
],
]
)
) == [
[
db_name,
table_name,
"1",
"['{}']".format(part),
"['{clickhouse}/{table_path}/{}/']".format(
part, clickhouse=clickhouse_path, table_path=table_path
),
result_part,
"{clickhouse}/{table_path}/{}/".format(
result_part, clickhouse=clickhouse_path, table_path=table_path
),
"all",
"1",
],
]
t.join()
assert (

File diff suppressed because one or more lines are too long

View File

@ -15,25 +15,25 @@ class ClickHouseStub(object):
channel: A grpc.Channel.
"""
self.ExecuteQuery = channel.unary_unary(
'/clickhouse.grpc.ClickHouse/ExecuteQuery',
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
"/clickhouse.grpc.ClickHouse/ExecuteQuery",
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
self.ExecuteQueryWithStreamInput = channel.stream_unary(
'/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamInput',
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamInput",
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
self.ExecuteQueryWithStreamOutput = channel.unary_stream(
'/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamOutput',
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamOutput",
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
self.ExecuteQueryWithStreamIO = channel.stream_stream(
'/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamIO',
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamIO",
request_serializer=clickhouse__grpc__pb2.QueryInfo.SerializeToString,
response_deserializer=clickhouse__grpc__pb2.Result.FromString,
)
class ClickHouseServicer(object):
@ -42,124 +42,173 @@ class ClickHouseServicer(object):
def ExecuteQuery(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ExecuteQueryWithStreamInput(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ExecuteQueryWithStreamOutput(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ExecuteQueryWithStreamIO(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_ClickHouseServicer_to_server(servicer, server):
rpc_method_handlers = {
'ExecuteQuery': grpc.unary_unary_rpc_method_handler(
servicer.ExecuteQuery,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
'ExecuteQueryWithStreamInput': grpc.stream_unary_rpc_method_handler(
servicer.ExecuteQueryWithStreamInput,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
'ExecuteQueryWithStreamOutput': grpc.unary_stream_rpc_method_handler(
servicer.ExecuteQueryWithStreamOutput,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
'ExecuteQueryWithStreamIO': grpc.stream_stream_rpc_method_handler(
servicer.ExecuteQueryWithStreamIO,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
"ExecuteQuery": grpc.unary_unary_rpc_method_handler(
servicer.ExecuteQuery,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
"ExecuteQueryWithStreamInput": grpc.stream_unary_rpc_method_handler(
servicer.ExecuteQueryWithStreamInput,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
"ExecuteQueryWithStreamOutput": grpc.unary_stream_rpc_method_handler(
servicer.ExecuteQueryWithStreamOutput,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
"ExecuteQueryWithStreamIO": grpc.stream_stream_rpc_method_handler(
servicer.ExecuteQueryWithStreamIO,
request_deserializer=clickhouse__grpc__pb2.QueryInfo.FromString,
response_serializer=clickhouse__grpc__pb2.Result.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'clickhouse.grpc.ClickHouse', rpc_method_handlers)
"clickhouse.grpc.ClickHouse", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
# This class is part of an EXPERIMENTAL API.
class ClickHouse(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def ExecuteQuery(request,
def ExecuteQuery(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/clickhouse.grpc.ClickHouse/ExecuteQuery',
"/clickhouse.grpc.ClickHouse/ExecuteQuery",
clickhouse__grpc__pb2.QueryInfo.SerializeToString,
clickhouse__grpc__pb2.Result.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def ExecuteQueryWithStreamInput(request_iterator,
def ExecuteQueryWithStreamInput(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_unary(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_unary(request_iterator, target, '/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamInput',
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamInput",
clickhouse__grpc__pb2.QueryInfo.SerializeToString,
clickhouse__grpc__pb2.Result.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def ExecuteQueryWithStreamOutput(request,
def ExecuteQueryWithStreamOutput(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_stream(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamOutput',
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamOutput",
clickhouse__grpc__pb2.QueryInfo.SerializeToString,
clickhouse__grpc__pb2.Result.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def ExecuteQueryWithStreamIO(request_iterator,
def ExecuteQueryWithStreamIO(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamIO',
"/clickhouse.grpc.ClickHouse/ExecuteQueryWithStreamIO",
clickhouse__grpc__pb2.QueryInfo.SerializeToString,
clickhouse__grpc__pb2.Result.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)