#!/usr/bin/env python3 import os import sys import argparse import string CURDIR = os.path.dirname(os.path.realpath(__file__)) sys.path.insert(0, os.path.join(CURDIR, "helpers")) def __format(template, **params): field_names = [v[1] for v in string.Formatter().parse(template) if v[1] is not None] kv_args = {} for field in field_names: if field in params: kv_args[field] = params[field] else: kv_args[field] = "" return template.format(**kv_args) def instance_create_statement( table_name, table_columns, table_keys, table_engine, with_deduplication, no_merges=True, ): template = """ CREATE TABLE {table_name} {table_columns} ENGINE = {table_engine} ORDER BY {table_keys} {table_settings}; {table_no_merges} """ params = dict() params["table_name"] = table_name params["table_columns"] = table_columns params["table_keys"] = table_keys params["table_no_merges"] = f"SYSTEM STOP MERGES {table_name};" if no_merges else "" params["table_engine"] = ( "MergeTree()" if table_engine == "MergeTree" else f"ReplicatedMergeTree('/clickhouse/tables/{{database}}/{table_name}', '1')" ) deduplication_window_setting_name = ( "non_replicated_deduplication_window" if table_engine == "MergeTree" else "replicated_deduplication_window" ) deduplication_window_setting_value = 1000 if with_deduplication else 0 settings = list() settings += [ f"{deduplication_window_setting_name}={deduplication_window_setting_value}" ] params["table_settings"] = "SETTINGS " + ",".join(settings) return __format(template, **params) def instance_insert_statement( table_name, count, insert_method, insert_unique_blocks, use_insert_token ): insert_settings = ( "" if not use_insert_token else "SETTINGS insert_deduplication_token='UDT'" ) if insert_method == "InsertSelect": template = """ INSERT INTO {table_name} SELECT {insert_columns} FROM numbers({count}) {insert_settings}; """ return __format( template, table_name=table_name, count=count, insert_columns=( "'src_4', 4" if not insert_unique_blocks else "'src_' || toString(number), number" ), insert_settings=insert_settings, ) else: template = """ INSERT INTO {table_name} {insert_settings} VALUES {insert_values}; """ values = [] for i in range(count): values += ( [f"('src_{i}', {i})"] if insert_unique_blocks else ["('src_4', 4)"] ) insert_values = ", ".join(values) return __format( template, table_name=table_name, insert_settings=insert_settings, insert_values=insert_values, ) def get_drop_tables_statements(tables): return "".join( [f"DROP TABLE IF EXISTS {table_name};\n" for table_name in tables[::-1]] ) def get_logs_statement(args): if args.get_logs: return "SET send_logs_level='test';" return "" def str2bool(v): if isinstance(v, bool): return v if v.lower() in ("yes", "true", "t", "y", "1"): return True elif v.lower() in ("no", "false", "f", "n", "0"): return False else: raise argparse.ArgumentTypeError("Boolean value expected.") class ArgsFactory: def __init__(self, parser): self.__parser = parser def add_opt_engine(self): self.__parser.add_argument( "--table-engine", choices=["ReplicatedMergeTree", "MergeTree"], default="MergeTree", ) def add_opt_user_token(self): self.__parser.add_argument( "--use-insert-token", type=str2bool, nargs="?", const=True, default=False ) def add_opt_single_thread(self): self.__parser.add_argument( "--single-thread", type=str2bool, nargs="?", const=True, default=True ) def add_opt_dedup_src(self): self.__parser.add_argument( "--deduplicate-src-table", type=str2bool, nargs="?", const=True, default=True, ) def add_opt_dedup_dst(self): self.__parser.add_argument( "--deduplicate-dst-table", type=str2bool, nargs="?", const=True, default=True, ) def add_opt_get_logs(self): self.__parser.add_argument( "--get-logs", type=str2bool, nargs="?", const=True, default=False ) def add_opt_uniq_blocks(self): self.__parser.add_argument( "--insert-unique-blocks", type=str2bool, nargs="?", const=True, default=True ) def add_opt_insert_method(self): self.__parser.add_argument( "--insert-method", choices=["InsertSelect", "InsertValues"], default="InsertSelect", ) def add_all(self): self.add_opt_engine() self.add_opt_user_token() self.add_opt_single_thread() self.add_opt_dedup_src() self.add_opt_dedup_dst() self.add_opt_get_logs() self.add_opt_insert_method() self.add_opt_uniq_blocks() def test_insert_several_blocks(parser): ArgsFactory(parser).add_all() def calle(args): create_table_a_b_statement = instance_create_statement( table_name="table_a_b", table_columns="(a String, b UInt64)", table_keys="(a, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_src_table, ) create_table_when_b_even_statement = instance_create_statement( table_name="table_when_b_even", table_columns="(a String, b UInt64)", table_keys="(a, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_dst_table, ) create_mv_statement = """ CREATE MATERIALIZED VIEW mv_b_even TO table_when_b_even AS SELECT a, b FROM table_a_b WHERE b % 2 = 0; """ drop_tables_statements = get_drop_tables_statements( ["table_a_b", "table_when_b_even", "mv_b_even"] ) insert_statement = instance_insert_statement( "table_a_b", 10, args.insert_method, args.insert_unique_blocks, args.use_insert_token, ) print_details_statements = f""" SELECT 'table_a_b'; SELECT 'count', count() FROM table_a_b; {"" if not args.get_logs else "SELECT _part, count() FROM table_a_b GROUP BY _part ORDER BY _part;"} SELECT 'table_when_b_even'; SELECT 'count', count() FROM table_when_b_even; {"" if not args.get_logs else "SELECT _part, count() FROM table_when_b_even GROUP BY _part ORDER BY _part;"} """ if args.insert_unique_blocks: assert_first_insert_statements = f""" SELECT throwIf( count() != 10 ) FROM table_a_b; SELECT throwIf( count() != 5 ) FROM table_when_b_even; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {10 if args.deduplicate_src_table else 20} ) FROM table_a_b; SELECT throwIf( count() != {5 if args.deduplicate_dst_table else 10} ) FROM table_when_b_even; """ else: if args.use_insert_token: assert_first_insert_statements = """ SELECT throwIf( count() != 10 ) FROM table_a_b; SELECT throwIf( count() != 10 ) FROM table_when_b_even; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {10 if args.deduplicate_src_table else 20} ) FROM table_a_b; SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 20} ) FROM table_when_b_even; """ else: assert_first_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 10} ) FROM table_a_b; SELECT throwIf( count() != {1 if args.deduplicate_dst_table else 10} ) FROM table_when_b_even; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 20} ) FROM table_a_b; SELECT throwIf( count() != {1 if args.deduplicate_dst_table else 20} ) FROM table_when_b_even; """ script = f""" {get_logs_statement(args)} SET max_insert_threads={1 if args.single_thread else 10}; SET update_insert_deduplication_token_in_dependent_materialized_views=1; SET deduplicate_blocks_in_dependent_materialized_views=1; SET max_block_size=1; SET min_insert_block_size_rows=0; SET min_insert_block_size_bytes=0; {drop_tables_statements} {create_table_a_b_statement} {create_table_when_b_even_statement} {create_mv_statement} -- first insert {insert_statement} {print_details_statements} {assert_first_insert_statements} -- second insert, it is retry {insert_statement} {print_details_statements} {assert_second_insert_statements} {drop_tables_statements} """ print(script) parser.set_defaults(func=calle) def test_mv_generates_several_blocks(parser): ArgsFactory(parser).add_all() def calle(args): tables = [ "table_for_join_with", "table_a_b", "table_when_b_even_and_joined", "mv_b_even", ] drop_tables_statements = get_drop_tables_statements(tables) details_print_for_table_for_join_with = "" if args.get_logs: details_print_for_table_for_join_with = """ SELECT 'table_for_join_with'; SELECT a_join, b, _part FROM table_for_join_with ORDER BY _part, a_join, b; """ create_table_a_b_statement = instance_create_statement( table_name="table_a_b", table_columns="(a_src String, b UInt64)", table_keys="(a_src, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_src_table, ) create_table_when_b_even_and_joined_statement = instance_create_statement( table_name="table_when_b_even_and_joined", table_columns="(a_src String, a_join String, b UInt64)", table_keys="(a_src, a_join, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_dst_table, ) insert_statement = instance_insert_statement( "table_a_b", 5, args.insert_method, args.insert_unique_blocks, args.use_insert_token, ) details_print_statements = f""" SELECT 'table_a_b'; SELECT 'count', count() FROM table_a_b; SELECT 'table_when_b_even_and_joined'; SELECT 'count', count() FROM table_when_b_even_and_joined; {"" if not args.get_logs else "SELECT _part, a_src, a_join, b FROM table_when_b_even_and_joined ORDER BY _part;"} """ if args.insert_unique_blocks: assert_first_insert_statements = f""" SELECT throwIf( count() != 5 ) FROM table_a_b; SELECT throwIf( count() != 9 ) FROM table_when_b_even_and_joined; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {5 if args.deduplicate_src_table else 10} ) FROM table_a_b; SELECT throwIf( count() != {9 if args.deduplicate_dst_table else 18} ) FROM table_when_b_even_and_joined; """ else: if args.use_insert_token: assert_first_insert_statements = f""" SELECT throwIf( count() != {5 if args.deduplicate_src_table else 5} ) FROM table_a_b; SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 10} ) FROM table_when_b_even_and_joined; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {5 if args.deduplicate_src_table else 10} ) FROM table_a_b; SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 20} ) FROM table_when_b_even_and_joined; """ else: assert_first_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 5} ) FROM table_a_b; SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 10} ) FROM table_when_b_even_and_joined; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 10} ) FROM table_a_b; SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 20} ) FROM table_when_b_even_and_joined; """ script = f""" {get_logs_statement(args)} SET max_insert_threads={1 if args.single_thread else 10}; SET update_insert_deduplication_token_in_dependent_materialized_views=1; SET deduplicate_blocks_in_dependent_materialized_views=1; SET max_block_size=1; SET min_insert_block_size_rows=0; SET min_insert_block_size_bytes=0; {drop_tables_statements} CREATE TABLE table_for_join_with (a_join String, b UInt64) ENGINE = MergeTree() ORDER BY (a_join, b); INSERT INTO table_for_join_with SELECT 'joined_' || toString(number), number FROM numbers(1); {details_print_for_table_for_join_with} {create_table_a_b_statement} SYSTEM STOP MERGES table_a_b; {create_table_when_b_even_and_joined_statement} SYSTEM STOP MERGES table_when_b_even_and_joined; CREATE MATERIALIZED VIEW mv_b_even TO table_when_b_even_and_joined AS SELECT a_src, a_join, table_for_join_with.b as b FROM table_a_b FULL OUTER JOIN table_for_join_with ON table_a_b.b = table_for_join_with.b AND table_a_b.b % 2 = 0 ORDER BY a_src, a_join, b; -- first insert {insert_statement} {details_print_statements} -- first assertion {assert_first_insert_statements} -- second insert {insert_statement} {details_print_statements} -- second assertion {assert_second_insert_statements} {drop_tables_statements} """ print(script) parser.set_defaults(func=calle) def test_several_mv_into_one_table(parser): ArgsFactory(parser).add_all() def calle(args): tables = ["table_src", "table_dst", "mv_b_even", "mv_b_even_even"] drop_tables_statements = get_drop_tables_statements(tables) create_table_src_statement = instance_create_statement( table_name="table_src", table_columns="(a String, b UInt64)", table_keys="(a, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_src_table, ) create_table_dst_statement = instance_create_statement( table_name="table_dst", table_columns="(a String, b UInt64)", table_keys="(a, b)", table_engine=args.table_engine, with_deduplication=args.deduplicate_dst_table, ) insert_statement = instance_insert_statement( "table_src", 8, args.insert_method, args.insert_unique_blocks, args.use_insert_token, ) details_print_statements = f""" SELECT 'table_src count', count() FROM table_src; SELECT 'table_dst count', count() FROM table_dst; {"" if not args.get_logs else "SELECT _part, count() FROM table_dst GROUP BY _part ORDER BY _part;"} """ if args.insert_unique_blocks: assert_first_insert_statements = f""" SELECT throwIf( count() != 8 ) FROM table_src; SELECT throwIf( count() != 6 ) FROM table_dst; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {8 if args.deduplicate_src_table else 16} ) FROM table_src; SELECT throwIf( count() != {6 if args.deduplicate_dst_table else 12} ) FROM table_dst; """ else: if args.use_insert_token: assert_first_insert_statements = f""" SELECT throwIf( count() != {8 if args.deduplicate_src_table else 8} ) FROM table_src; SELECT throwIf( count() != {16 if args.deduplicate_dst_table else 16} ) FROM table_dst; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {8 if args.deduplicate_src_table else 16} ) FROM table_src; SELECT throwIf( count() != {16 if args.deduplicate_dst_table else 32} ) FROM table_dst; """ else: assert_first_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 8} ) FROM table_src; SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 16} ) FROM table_dst; """ assert_second_insert_statements = f""" SELECT throwIf( count() != {1 if args.deduplicate_src_table else 16} ) FROM table_src; SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 32} ) FROM table_dst; """ script = f""" {get_logs_statement(args)} SET max_insert_threads={1 if args.single_thread else 10}; SET update_insert_deduplication_token_in_dependent_materialized_views=1; SET deduplicate_blocks_in_dependent_materialized_views=1; SET max_block_size=1; SET min_insert_block_size_rows=0; SET min_insert_block_size_bytes=0; {drop_tables_statements} {create_table_src_statement} {create_table_dst_statement} CREATE MATERIALIZED VIEW mv_b_even TO table_dst AS SELECT a, b FROM table_src WHERE b % 2 = 0; CREATE MATERIALIZED VIEW mv_b_even_even TO table_dst AS SELECT a, b FROM table_src WHERE b % 4 = 0; -- first insert {insert_statement} {details_print_statements} {assert_first_insert_statements} -- second insert, retry {insert_statement} {details_print_statements} {assert_second_insert_statements} {drop_tables_statements} """ print(script) parser.set_defaults(func=calle) def parse_args(): parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="test") test_insert_several_blocks( subparsers.add_parser("insert_several_blocks_into_table") ) test_mv_generates_several_blocks( subparsers.add_parser("mv_generates_several_blocks") ) test_several_mv_into_one_table(subparsers.add_parser("several_mv_into_one_table")) args = parser.parse_args() if args.test is None: parser.print_help() return args def main(): args = parse_args() if args.test is not None: args.func(args) if __name__ == "__main__": main()