#!/usr/bin/env python3 import requests import os import sys CURDIR = os.path.dirname(os.path.realpath(__file__)) sys.path.insert(0, os.path.join(CURDIR, 'helpers')) from pure_http_client import ClickHouseClient class Tester: ''' - Creates test table - Deletes the specified range of rows - Masks another range using row-level policy - Runs some read queries and checks that the results ''' def __init__(self, session, url, index_granularity, total_rows): self.session = session self.url = url self.index_granularity = index_granularity self.total_rows = total_rows self.reported_errors = set() self.repro_queries = [] def report_error(self): print('Repro steps:', '\n\n\t'.join(self.repro_queries)) exit(1) def query(self, query_text, include_in_repro_steps = True, expected_data = None): self.repro_queries.append(query_text) resp = self.session.post(self.url, data=query_text) if resp.status_code != 200: # Group similar errors error = resp.text[0:40] if error not in self.reported_errors: self.reported_errors.add(error) print('Code:', resp.status_code) print('Result:', resp.text) self.report_error() result = resp.text # Check that the result is as expected if ((not expected_data is None) and (int(result) != len(expected_data))): print('Expected {} rows, got {}'.format(len(expected_data), result)) print('Expected data:' + str(expected_data)) self.report_error() if not include_in_repro_steps: self.repro_queries.pop() def check_data(self, all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end): all_data_after_delete = all_data[ ~((all_data.a == 0) & (all_data.b > delete_range_start) & (all_data.b <= delete_range_end))] all_data_after_row_policy = all_data_after_delete[ (all_data_after_delete.b <= row_level_policy_range_start) | (all_data_after_delete.b > row_level_policy_range_end)] for to_select in ['count()', 'sum(d)']: # Test reading with and without column with default value self.query('SELECT {} FROM tab_02473;'.format(to_select), False, all_data_after_row_policy) delta = 10 for query_range_start in [0, delta]: for query_range_end in [self.total_rows - delta]: #, self.total_rows]: expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & (all_data_after_row_policy.b > query_range_start) & (all_data_after_row_policy.b <= query_range_end)] self.query('SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} WHERE a == 0;'.format( to_select, query_range_start, query_range_end), False, expected) expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & (all_data_after_row_policy.c > query_range_start) & (all_data_after_row_policy.c <= query_range_end)] self.query('SELECT {} from tab_02473 PREWHERE c > {} AND c <= {} WHERE a == 0;'.format( to_select, query_range_start, query_range_end), False, expected) expected = all_data_after_row_policy[ (all_data_after_row_policy.a == 0) & ((all_data_after_row_policy.c <= query_range_start) | (all_data_after_row_policy.c > query_range_end))] self.query('SELECT {} from tab_02473 PREWHERE c <= {} OR c > {} WHERE a == 0;'.format( to_select, query_range_start, query_range_end), False, expected) def run_test(self, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end): self.repro_queries = [] self.query(''' CREATE TABLE tab_02473 (a Int8, b Int32, c Int32, PRIMARY KEY (a)) ENGINE = MergeTree() ORDER BY (a, b) SETTINGS min_bytes_for_wide_part = 0, index_granularity = {};'''.format(self.index_granularity)) self.query('INSERT INTO tab_02473 select 0, number+1, number+1 FROM numbers({});'.format(self.total_rows)) client = ClickHouseClient() all_data = client.query_return_df("SELECT a, b, c, 1 as d FROM tab_02473 FORMAT TabSeparatedWithNames;") self.query('OPTIMIZE TABLE tab_02473 FINAL SETTINGS mutations_sync=2;') # After all data has been written add a column with default value self.query('ALTER TABLE tab_02473 ADD COLUMN d Int64 DEFAULT 1;') self.check_data(all_data, -100, -100, -100, -100) self.query('DELETE FROM tab_02473 WHERE a = 0 AND b > {} AND b <= {};'.format( delete_range_start, delete_range_end)) self.check_data(all_data, delete_range_start, delete_range_end, -100, -100) self.query('CREATE ROW POLICY policy_tab_02473 ON tab_02473 FOR SELECT USING b <= {} OR b > {} TO default;'.format( row_level_policy_range_start, row_level_policy_range_end)) self.check_data(all_data, delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end) self.query('DROP POLICY policy_tab_02473 ON tab_02473;') self.query('DROP TABLE tab_02473;') def main(): # Set mutations to synchronous mode and enable lightweight DELETE's url = os.environ['CLICKHOUSE_URL'] + '&max_threads=1' default_index_granularity = 10; total_rows = 8 * default_index_granularity step = default_index_granularity session = requests.Session() for index_granularity in [default_index_granularity-1, default_index_granularity]: # [default_index_granularity-1, default_index_granularity+1, default_index_granularity]: tester = Tester(session, url, index_granularity, total_rows) # Test combinations of ranges of various size masked by lightweight DELETES # along with ranges of various size masked by row-level policies for delete_range_start in range(0, total_rows, 3 * step): for delete_range_end in range(delete_range_start + 3 * step, total_rows, 2 * step): for row_level_policy_range_start in range(0, total_rows, 3 * step): for row_level_policy_range_end in range(row_level_policy_range_start + 3 * step, total_rows, 2 * step): tester.run_test(delete_range_start, delete_range_end, row_level_policy_range_start, row_level_policy_range_end) if __name__ == "__main__": main()