#!/usr/bin/env python3 import requests import os import sys CURDIR = os.path.dirname(os.path.realpath(__file__)) sys.path.insert(0, os.path.join(CURDIR, "helpers")) from pure_http_client import ClickHouseClient class Tester: """ - Creates test table - Deletes the specified range of rows - Masks another range using row-level policy - Runs some read queries and checks that the results """ def __init__(self, session, url, index_granularity, total_rows): self.session = session self.url = url self.index_granularity = index_granularity self.total_rows = total_rows self.reported_errors = set() self.repro_queries = [] def report_error(self): print("Repro steps:", "\n\n\t".join(self.repro_queries)) exit(1) def query(self, query_text, include_in_repro_steps=True, expected_data=None): self.repro_queries.append(query_text) resp = self.session.post(self.url, data=query_text) if resp.status_code != 200: # Group similar errors error = resp.text[0:40] if error not in self.reported_errors: self.reported_errors.add(error) print("Code:", resp.status_code) print("Result:", resp.text) self.report_error() result = resp.text # Check that the result is as expected if (not expected_data is None) and (int(result) != len(expected_data)): print("Expected {} rows, got {}".format(len(expected_data), result)) print("Expected data:" + str(expected_data)) self.report_error() if not include_in_repro_steps: self.repro_queries.pop() def check_data( self, all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end, ): all_data_after_delete = all_data[ ~( (all_data.a == 0) & (all_data.b > delete_range_start) & (all_data.b <= delete_range_end) ) ] all_data_after_row_policy = all_data_after_delete[ (all_data_after_delete.b <= row_level_policy_range_start) | (all_data_after_delete.b > row_level_policy_range_end) ] for to_select in [ "count()", "sum(d)", ]: # Test reading with and without column with default value self.query( "SELECT {} FROM tab_02473;".format(to_select), False, all_data_after_row_policy, ) delta = 10 for query_range_start in [0, delta]: for query_range_end in [self.total_rows - delta]: # , self.total_rows]: expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & (all_data_after_row_policy.b > query_range_start) & (all_data_after_row_policy.b <= query_range_end) ] self.query( "SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} WHERE a == 0;".format( to_select, query_range_start, query_range_end ), False, expected, ) expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & (all_data_after_row_policy.c > query_range_start) & (all_data_after_row_policy.c <= query_range_end) ] self.query( "SELECT {} from tab_02473 PREWHERE c > {} AND c <= {} WHERE a == 0;".format( to_select, query_range_start, query_range_end ), False, expected, ) expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & ( (all_data_after_row_policy.c <= query_range_start) | (all_data_after_row_policy.c > query_range_end) ) ] self.query( "SELECT {} from tab_02473 PREWHERE c <= {} OR c > {} WHERE a == 0;".format( to_select, query_range_start, query_range_end ), False, expected, ) def run_test( self, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end, ): self.repro_queries = [] self.query( """ CREATE TABLE tab_02473 (a Int8, b Int32, c Int32, PRIMARY KEY (a)) ENGINE = MergeTree() ORDER BY (a, b) SETTINGS min_bytes_for_wide_part = 0, index_granularity = {};""".format( self.index_granularity ) ) self.query( "INSERT INTO tab_02473 select 0, number+1, number+1 FROM numbers({});".format( self.total_rows ) ) client = ClickHouseClient() all_data = client.query_return_df( "SELECT a, b, c, 1 as d FROM tab_02473 FORMAT TabSeparatedWithNames;" ) self.query("OPTIMIZE TABLE tab_02473 FINAL SETTINGS mutations_sync=2;") # After all data has been written add a column with default value self.query("ALTER TABLE tab_02473 ADD COLUMN d Int64 DEFAULT 1;") self.check_data(all_data, -100, -100, -100, -100) self.query( "DELETE FROM tab_02473 WHERE a = 0 AND b > {} AND b <= {};".format( delete_range_start, delete_range_end ) ) self.check_data(all_data, delete_range_start, delete_range_end, -100, -100) self.query( "CREATE ROW POLICY policy_tab_02473 ON tab_02473 FOR SELECT USING b <= {} OR b > {} TO default;".format( row_level_policy_range_start, row_level_policy_range_end ) ) self.check_data( all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end, ) self.query("DROP POLICY policy_tab_02473 ON tab_02473;") self.query("DROP TABLE tab_02473;") def main(): # Set mutations to synchronous mode and enable lightweight DELETE's url = os.environ["CLICKHOUSE_URL"] + "&max_threads=1" default_index_granularity = 10 total_rows = 8 * default_index_granularity step = default_index_granularity session = requests.Session() for index_granularity in [ default_index_granularity - 1, default_index_granularity, ]: # [default_index_granularity-1, default_index_granularity+1, default_index_granularity]: tester = Tester(session, url, index_granularity, total_rows) # Test combinations of ranges of various size masked by lightweight DELETES # along with ranges of various size masked by row-level policies for delete_range_start in range(0, total_rows, 3 * step): for delete_range_end in range( delete_range_start + 3 * step, total_rows, 2 * step ): for row_level_policy_range_start in range(0, total_rows, 3 * step): for row_level_policy_range_end in range( row_level_policy_range_start + 3 * step, total_rows, 2 * step ): tester.run_test( delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end, ) if __name__ == "__main__": main()