From 383fc06761f81bee735ec22692a2d506ca78c01e Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 9 May 2023 16:10:53 +0200 Subject: [PATCH] Fix --- .../MaterializedPostgreSQLConsumer.cpp | 37 +++--- .../MaterializedPostgreSQLSettings.h | 3 + .../PostgreSQLReplicationHandler.cpp | 21 +-- .../PostgreSQL/PostgreSQLReplicationHandler.h | 7 +- tests/integration/helpers/postgres_utility.py | 124 +++++++++++------- .../test.py | 75 ++++++++++- 6 files changed, 183 insertions(+), 84 deletions(-) diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index d048c94ac75..ea7009fc082 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -562,34 +562,27 @@ void MaterializedPostgreSQLConsumer::syncTables() Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns)); storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns(); - try + if (result_rows.rows()) { - if (result_rows.rows()) - { - auto storage = storage_data.storage; + auto storage = storage_data.storage; - auto insert_context = Context::createCopy(context); - insert_context->setInternalQuery(true); + auto insert_context = Context::createCopy(context); + insert_context->setInternalQuery(true); - auto insert = std::make_shared(); - insert->table_id = storage->getStorageID(); - insert->columns = storage_data.buffer.columns_ast; + auto insert = std::make_shared(); + insert->table_id = storage->getStorageID(); + insert->columns = storage_data.buffer.columns_ast; - InterpreterInsertQuery interpreter(insert, insert_context, true); - auto io = interpreter.execute(); - auto input = std::make_shared( - result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows())); + InterpreterInsertQuery interpreter(insert, insert_context, true); + auto io = interpreter.execute(); + auto input = std::make_shared( + result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows())); - assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync"); - io.pipeline.complete(Pipe(std::move(input))); + assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync"); + io.pipeline.complete(Pipe(std::move(input))); - CompletedPipelineExecutor executor(io.pipeline); - executor.execute(); - } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); + CompletedPipelineExecutor executor(io.pipeline); + executor.execute(); } } diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h b/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h index e8d42ef3668..d3d2faba497 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h @@ -21,6 +21,9 @@ namespace DB M(Bool, materialized_postgresql_tables_list_with_schema, false, \ "Consider by default that if there is a dot in tables list 'name.name', " \ "then the first name is postgres schema and second is postgres table. This setting is needed to allow table names with dots", 0) \ + M(UInt64, materialized_postgresql_backoff_min_ms, 200, "Poll backoff start point", 0) \ + M(UInt64, materialized_postgresql_backoff_max_ms, 10000, "Poll backoff max point", 0) \ + M(UInt64, materialized_postgresql_backoff_factor, 2, "Poll backoff factor", 0) \ DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 998db4ea79e..f57a6a26a62 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -22,8 +22,6 @@ namespace DB { -static const auto RESCHEDULE_MS = 1000; -static const auto BACKOFF_TRESHOLD_MS = 10000; static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min namespace ErrorCodes @@ -80,7 +78,10 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( , schema_list(replication_settings.materialized_postgresql_schema_list) , schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema) , user_provided_snapshot(replication_settings.materialized_postgresql_snapshot) - , milliseconds_to_wait(RESCHEDULE_MS) + , reschedule_backoff_min_ms(replication_settings.materialized_postgresql_backoff_min_ms) + , reschedule_backoff_max_ms(replication_settings.materialized_postgresql_backoff_max_ms) + , reschedule_backoff_factor(replication_settings.materialized_postgresql_backoff_factor) + , milliseconds_to_wait(reschedule_backoff_min_ms) { if (!schema_list.empty() && !tables_list.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and tables list at the same time"); @@ -166,7 +167,7 @@ void PostgreSQLReplicationHandler::checkConnectionAndStart() throw; LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what()); - startup_task->scheduleAfter(RESCHEDULE_MS); + startup_task->scheduleAfter(milliseconds_to_wait); } catch (...) { @@ -435,18 +436,18 @@ void PostgreSQLReplicationHandler::consumerFunc() if (schedule_now) { - milliseconds_to_wait = RESCHEDULE_MS; + milliseconds_to_wait = reschedule_backoff_min_ms; consumer_task->schedule(); LOG_DEBUG(log, "Scheduling replication thread: now"); } else { - consumer_task->scheduleAfter(milliseconds_to_wait); - if (milliseconds_to_wait < BACKOFF_TRESHOLD_MS) - milliseconds_to_wait *= 2; + if (milliseconds_to_wait < reschedule_backoff_max_ms) + milliseconds_to_wait = std::min(milliseconds_to_wait * reschedule_backoff_factor, reschedule_backoff_max_ms); LOG_DEBUG(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait); + consumer_task->scheduleAfter(milliseconds_to_wait); } } @@ -892,7 +893,7 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost catch (...) { consumer_task->activate(); - consumer_task->scheduleAfter(RESCHEDULE_MS); + consumer_task->scheduleAfter(milliseconds_to_wait); auto error_message = getCurrentExceptionMessage(false); throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, @@ -922,7 +923,7 @@ void PostgreSQLReplicationHandler::removeTableFromReplication(const String & pos catch (...) { consumer_task->activate(); - consumer_task->scheduleAfter(RESCHEDULE_MS); + consumer_task->scheduleAfter(milliseconds_to_wait); auto error_message = getCurrentExceptionMessage(false); throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 10a196cf31b..4c16ff95692 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -140,13 +140,16 @@ private: BackgroundSchedulePool::TaskHolder consumer_task; BackgroundSchedulePool::TaskHolder cleanup_task; + const UInt64 reschedule_backoff_min_ms; + const UInt64 reschedule_backoff_max_ms; + const UInt64 reschedule_backoff_factor; + UInt64 milliseconds_to_wait; + std::atomic stop_synchronization = false; /// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables. MaterializedStorages materialized_storages; - UInt64 milliseconds_to_wait; - bool replication_handler_initialized = false; }; diff --git a/tests/integration/helpers/postgres_utility.py b/tests/integration/helpers/postgres_utility.py index dfae37af434..1a00faf0f9d 100644 --- a/tests/integration/helpers/postgres_utility.py +++ b/tests/integration/helpers/postgres_utility.py @@ -76,16 +76,24 @@ def drop_postgres_schema(cursor, schema_name): def create_postgres_table( - cursor, table_name, replica_identity_full=False, template=postgres_table_template + cursor, + table_name, + database_name="", + replica_identity_full=False, + template=postgres_table_template, ): - drop_postgres_table(cursor, table_name) - cursor.execute(template.format(table_name)) + if database_name == "": + name = table_name + else: + name = f"{database_name}.{table_name}" + drop_postgres_table(cursor, name) + cursor.execute(template.format(name)) if replica_identity_full: - cursor.execute(f"ALTER TABLE {table_name} REPLICA IDENTITY FULL;") + cursor.execute(f"ALTER TABLE {name} REPLICA IDENTITY FULL;") -def drop_postgres_table(cursor, table_name): - cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}" """) +def drop_postgres_table(cursor, name): + cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """) def create_postgres_table_with_schema(cursor, schema_name, table_name): @@ -103,13 +111,16 @@ class PostgresManager: self.created_materialized_postgres_db_list = set() self.created_ch_postgres_db_list = set() - def init(self, instance, ip, port): + def init(self, instance, ip, port, default_database="postgres_database"): self.instance = instance self.ip = ip self.port = port - self.conn = get_postgres_conn(ip=self.ip, port=self.port) + self.default_database = default_database self.prepare() + def get_default_database(self): + return self.default_database + def restart(self): try: self.clear() @@ -119,10 +130,17 @@ class PostgresManager: raise ex def prepare(self): - conn = get_postgres_conn(ip=self.ip, port=self.port) - cursor = conn.cursor() - self.create_postgres_db(cursor, "postgres_database") - self.create_clickhouse_postgres_db(ip=self.ip, port=self.port) + self.conn = get_postgres_conn(ip=self.ip, port=self.port) + self.cursor = self.conn.cursor() + if self.default_database != "": + self.create_postgres_db(self.default_database) + self.conn = get_postgres_conn( + ip=self.ip, + port=self.port, + database=True, + database_name=self.default_database, + ) + self.cursor = self.conn.cursor() def clear(self): if self.conn.closed == 0: @@ -132,63 +150,76 @@ class PostgresManager: for db in self.created_ch_postgres_db_list.copy(): self.drop_clickhouse_postgres_db(db) if len(self.created_postgres_db_list) > 0: - conn = get_postgres_conn(ip=self.ip, port=self.port) - cursor = conn.cursor() + self.conn = get_postgres_conn(ip=self.ip, port=self.port) + self.cursor = self.conn.cursor() for db in self.created_postgres_db_list.copy(): - self.drop_postgres_db(cursor, db) + self.drop_postgres_db(db) - def get_db_cursor(self): - self.conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) + def get_db_cursor(self, database_name=""): + if database_name == "": + database_name = self.default_database + self.conn = get_postgres_conn( + ip=self.ip, port=self.port, database=True, database_name=database_name + ) return self.conn.cursor() - def create_postgres_db(self, cursor, name="postgres_database"): - self.drop_postgres_db(cursor, name) - self.created_postgres_db_list.add(name) - cursor.execute(f"CREATE DATABASE {name}") + def database_or_default(self, database_name): + if database_name == "" and self.default_database == "": + raise Exception("Database name is empty") + if database_name == "": + database_name = self.default_database + return database_name - def drop_postgres_db(self, cursor, name="postgres_database"): - cursor.execute(f"DROP DATABASE IF EXISTS {name}") - if name in self.created_postgres_db_list: - self.created_postgres_db_list.remove(name) + def create_postgres_db(self, database_name=""): + database_name = self.database_or_default(database_name) + self.drop_postgres_db(database_name) + self.created_postgres_db_list.add(database_name) + self.cursor.execute(f"CREATE DATABASE {database_name}") + + def drop_postgres_db(self, database_name=""): + database_name = self.database_or_default(database_name) + self.cursor.execute(f"DROP DATABASE IF EXISTS {database_name}") + if database_name in self.created_postgres_db_list: + self.created_postgres_db_list.remove(database_name) def create_clickhouse_postgres_db( self, - ip, - port, - name="postgres_database", - database_name="postgres_database", + database_name="", schema_name="", ): - self.drop_clickhouse_postgres_db(name) - self.created_ch_postgres_db_list.add(name) + database_name = self.database_or_default(database_name) + self.drop_clickhouse_postgres_db(database_name) + self.created_ch_postgres_db_list.add(database_name) if len(schema_name) == 0: self.instance.query( f""" - CREATE DATABASE {name} - ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword')""" + CREATE DATABASE {database_name} + ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword')""" ) else: self.instance.query( f""" - CREATE DATABASE {name} - ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')""" + CREATE DATABASE {database_name} + ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')""" ) - def drop_clickhouse_postgres_db(self, name="postgres_database"): - self.instance.query(f"DROP DATABASE IF EXISTS {name}") - if name in self.created_ch_postgres_db_list: - self.created_ch_postgres_db_list.remove(name) + def drop_clickhouse_postgres_db(self, database_name=""): + database_name = self.database_or_default(database_name) + self.instance.query(f"DROP DATABASE IF EXISTS {database_name}") + if database_name in self.created_ch_postgres_db_list: + self.created_ch_postgres_db_list.remove(database_name) def create_materialized_db( self, ip, port, materialized_database="test_database", - postgres_database="postgres_database", + postgres_database="", settings=[], table_overrides="", ): + postgres_database = self.database_or_default(postgres_database) self.created_materialized_postgres_db_list.add(materialized_database) self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}") @@ -207,17 +238,12 @@ class PostgresManager: self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database} SYNC") if materialized_database in self.created_materialized_postgres_db_list: self.created_materialized_postgres_db_list.remove(materialized_database) - assert materialized_database not in self.instance.query("SHOW DATABASES") - def create_and_fill_postgres_table(self, table_name): - conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) - cursor = conn.cursor() - self.create_and_fill_postgres_table_from_cursor(cursor, table_name) - - def create_and_fill_postgres_table_from_cursor(self, cursor, table_name): - create_postgres_table(cursor, table_name) + def create_and_fill_postgres_table(self, table_name, database_name=""): + create_postgres_table(self.cursor, table_name, database_name) + database_name = self.database_or_default(database_name) self.instance.query( - f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)" + f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)" ) def create_and_fill_postgres_tables(self, tables_num, numbers=50): diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index 90d19e9532c..3b5194e8806 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -46,14 +46,34 @@ instance = cluster.add_instance( stay_alive=True, ) +instance2 = cluster.add_instance( + "instance2", + main_configs=["configs/log_conf.xml", "configs/merge_tree_too_many_parts.xml"], + user_configs=["configs/users.xml"], + with_postgres=True, + stay_alive=True, +) + + pg_manager = PostgresManager() +pg_manager2 = PostgresManager() @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() - pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port) + pg_manager.init( + instance, + cluster.postgres_ip, + cluster.postgres_port, + default_database="test_database", + ) + pg_manager.create_clickhouse_postgres_db() + pg_manager2.init( + instance2, cluster.postgres_ip, cluster.postgres_port, "test_database2" + ) + pg_manager2.create_clickhouse_postgres_db() yield cluster finally: @@ -649,6 +669,59 @@ def test_materialized_view(started_cluster): pg_manager.drop_materialized_db() +def test_too_many_parts(started_cluster): + table = "test_table" + pg_manager2.create_and_fill_postgres_table(table) + pg_manager2.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = 'test_table', materialized_postgresql_backoff_min_ms = 100, materialized_postgresql_backoff_max_ms = 100" + ], + ) + check_tables_are_synchronized( + instance2, "test_table", postgres_database=pg_manager2.get_default_database() + ) + assert ( + "50" == instance2.query("SELECT count() FROM test_database.test_table").strip() + ) + + instance2.query("SYSTEM STOP MERGES") + num = 50 + for i in range(10): + instance2.query( + f""" + INSERT INTO {pg_manager2.get_default_database()}.test_table SELECT {num}, {num}; + """ + ) + num = num + 1 + for i in range(30): + if num == int( + instance2.query("SELECT count() FROM test_database.test_table") + ) or instance2.contains_in_log("DB::Exception: Too many parts"): + break + time.sleep(1) + print(f"wait sync try {i}") + if instance2.contains_in_log("DB::Exception: Too many parts"): + num = num - 1 + break + assert num == int( + instance2.query("SELECT count() FROM test_database.test_table") + ) + + assert instance2.contains_in_log("DB::Exception: Too many parts") + print(num) + assert num == int(instance2.query("SELECT count() FROM test_database.test_table")) + + instance2.query("SYSTEM START MERGES") + check_tables_are_synchronized( + instance2, "test_table", postgres_database=pg_manager2.get_default_database() + ) + + # assert "200" == instance.query("SELECT count FROM test_database.test_table").strip() + pg_manager2.drop_materialized_db() + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")