ClickHouse/tests/queries/0_stateless/02473_multistep_prewhere.python
2023-03-23 15:33:23 +00:00

224 lines
8.0 KiB
Python

#!/usr/bin/env python3
import requests
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from pure_http_client import ClickHouseClient
class Tester:
"""
- Creates test table
- Deletes the specified range of rows
- Masks another range using row-level policy
- Runs some read queries and checks that the results
"""
def __init__(self, session, url, index_granularity, total_rows):
self.session = session
self.url = url
self.index_granularity = index_granularity
self.total_rows = total_rows
self.reported_errors = set()
self.repro_queries = []
def report_error(self):
print("Repro steps:", "\n\n\t".join(self.repro_queries))
exit(1)
def query(self, query_text, include_in_repro_steps=True, expected_data=None):
self.repro_queries.append(query_text)
resp = self.session.post(self.url, data=query_text)
if resp.status_code != 200:
# Group similar errors
error = resp.text[0:40]
if error not in self.reported_errors:
self.reported_errors.add(error)
print("Code:", resp.status_code)
print("Result:", resp.text)
self.report_error()
result = resp.text
# Check that the result is as expected
if (not expected_data is None) and (int(result) != len(expected_data)):
print("Expected {} rows, got {}".format(len(expected_data), result))
print("Expected data:" + str(expected_data))
self.report_error()
if not include_in_repro_steps:
self.repro_queries.pop()
def check_data(
self,
all_data,
delete_range_start,
delete_range_end,
row_level_policy_range_start,
row_level_policy_range_end,
):
all_data_after_delete = all_data[
~(
(all_data.a == 0)
& (all_data.b > delete_range_start)
& (all_data.b <= delete_range_end)
)
]
all_data_after_row_policy = all_data_after_delete[
(all_data_after_delete.b <= row_level_policy_range_start)
| (all_data_after_delete.b > row_level_policy_range_end)
]
for to_select in [
"count()",
"sum(d)",
]: # Test reading with and without column with default value
self.query(
"SELECT {} FROM tab_02473;".format(to_select),
False,
all_data_after_row_policy,
)
delta = 10
for query_range_start in [0, delta]:
for query_range_end in [self.total_rows - delta]: # , self.total_rows]:
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0)
& (all_data_after_row_policy.b > query_range_start)
& (all_data_after_row_policy.b <= query_range_end)
]
self.query(
"SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} WHERE a == 0;".format(
to_select, query_range_start, query_range_end
),
False,
expected,
)
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0)
& (all_data_after_row_policy.c > query_range_start)
& (all_data_after_row_policy.c <= query_range_end)
]
self.query(
"SELECT {} from tab_02473 PREWHERE c > {} AND c <= {} WHERE a == 0;".format(
to_select, query_range_start, query_range_end
),
False,
expected,
)
expected = all_data_after_row_policy[
(all_data_after_row_policy.a == 0)
& (
(all_data_after_row_policy.c <= query_range_start)
| (all_data_after_row_policy.c > query_range_end)
)
]
self.query(
"SELECT {} from tab_02473 PREWHERE c <= {} OR c > {} WHERE a == 0;".format(
to_select, query_range_start, query_range_end
),
False,
expected,
)
def run_test(
self,
delete_range_start,
delete_range_end,
row_level_policy_range_start,
row_level_policy_range_end,
):
self.repro_queries = []
self.query(
"""
CREATE TABLE tab_02473 (a Int8, b Int32, c Int32, PRIMARY KEY (a))
ENGINE = MergeTree() ORDER BY (a, b)
SETTINGS min_bytes_for_wide_part = 0, index_granularity = {};""".format(
self.index_granularity
)
)
self.query(
"INSERT INTO tab_02473 select 0, number+1, number+1 FROM numbers({});".format(
self.total_rows
)
)
client = ClickHouseClient()
all_data = client.query_return_df(
"SELECT a, b, c, 1 as d FROM tab_02473 FORMAT TabSeparatedWithNames;"
)
self.query("OPTIMIZE TABLE tab_02473 FINAL SETTINGS mutations_sync=2;")
# After all data has been written add a column with default value
self.query("ALTER TABLE tab_02473 ADD COLUMN d Int64 DEFAULT 1;")
self.check_data(all_data, -100, -100, -100, -100)
self.query(
"DELETE FROM tab_02473 WHERE a = 0 AND b > {} AND b <= {};".format(
delete_range_start, delete_range_end
)
)
self.check_data(all_data, delete_range_start, delete_range_end, -100, -100)
self.query(
"CREATE ROW POLICY policy_tab_02473 ON tab_02473 FOR SELECT USING b <= {} OR b > {} TO default;".format(
row_level_policy_range_start, row_level_policy_range_end
)
)
self.check_data(
all_data,
delete_range_start,
delete_range_end,
row_level_policy_range_start,
row_level_policy_range_end,
)
self.query("DROP POLICY policy_tab_02473 ON tab_02473;")
self.query("DROP TABLE tab_02473;")
def main():
# Set mutations to synchronous mode and enable lightweight DELETE's
url = os.environ["CLICKHOUSE_URL"] + "&max_threads=1"
default_index_granularity = 10
total_rows = 8 * default_index_granularity
step = default_index_granularity
session = requests.Session()
for index_granularity in [
default_index_granularity - 1,
default_index_granularity,
]: # [default_index_granularity-1, default_index_granularity+1, default_index_granularity]:
tester = Tester(session, url, index_granularity, total_rows)
# Test combinations of ranges of various size masked by lightweight DELETES
# along with ranges of various size masked by row-level policies
for delete_range_start in range(0, total_rows, 3 * step):
for delete_range_end in range(
delete_range_start + 3 * step, total_rows, 2 * step
):
for row_level_policy_range_start in range(0, total_rows, 3 * step):
for row_level_policy_range_end in range(
row_level_policy_range_start + 3 * step, total_rows, 2 * step
):
tester.run_test(
delete_range_start,
delete_range_end,
row_level_policy_range_start,
row_level_policy_range_end,
)
if __name__ == "__main__":
main()