This commit is contained in:
kssenii 2023-05-09 16:10:53 +02:00
parent ca05a737c1
commit 383fc06761
6 changed files with 183 additions and 84 deletions

View File

@ -562,34 +562,27 @@ void MaterializedPostgreSQLConsumer::syncTables()
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns)); Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns(); storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
try if (result_rows.rows())
{ {
if (result_rows.rows()) auto storage = storage_data.storage;
{
auto storage = storage_data.storage;
auto insert_context = Context::createCopy(context); auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true); insert_context->setInternalQuery(true);
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID(); insert->table_id = storage->getStorageID();
insert->columns = storage_data.buffer.columns_ast; insert->columns = storage_data.buffer.columns_ast;
InterpreterInsertQuery interpreter(insert, insert_context, true); InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute(); auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>( auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows())); result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));
assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync"); assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input))); io.pipeline.complete(Pipe(std::move(input)));
CompletedPipelineExecutor executor(io.pipeline); CompletedPipelineExecutor executor(io.pipeline);
executor.execute(); executor.execute();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }

View File

@ -21,6 +21,9 @@ namespace DB
M(Bool, materialized_postgresql_tables_list_with_schema, false, \ M(Bool, materialized_postgresql_tables_list_with_schema, false, \
"Consider by default that if there is a dot in tables list 'name.name', " \ "Consider by default that if there is a dot in tables list 'name.name', " \
"then the first name is postgres schema and second is postgres table. This setting is needed to allow table names with dots", 0) \ "then the first name is postgres schema and second is postgres table. This setting is needed to allow table names with dots", 0) \
M(UInt64, materialized_postgresql_backoff_min_ms, 200, "Poll backoff start point", 0) \
M(UInt64, materialized_postgresql_backoff_max_ms, 10000, "Poll backoff max point", 0) \
M(UInt64, materialized_postgresql_backoff_factor, 2, "Poll backoff factor", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS) DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)

View File

@ -22,8 +22,6 @@
namespace DB namespace DB
{ {
static const auto RESCHEDULE_MS = 1000;
static const auto BACKOFF_TRESHOLD_MS = 10000;
static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min static const auto CLEANUP_RESCHEDULE_MS = 600000 * 3; /// 30 min
namespace ErrorCodes namespace ErrorCodes
@ -80,7 +78,10 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, schema_list(replication_settings.materialized_postgresql_schema_list) , schema_list(replication_settings.materialized_postgresql_schema_list)
, schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema) , schema_as_a_part_of_table_name(!schema_list.empty() || replication_settings.materialized_postgresql_tables_list_with_schema)
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot) , user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, milliseconds_to_wait(RESCHEDULE_MS) , reschedule_backoff_min_ms(replication_settings.materialized_postgresql_backoff_min_ms)
, reschedule_backoff_max_ms(replication_settings.materialized_postgresql_backoff_max_ms)
, reschedule_backoff_factor(replication_settings.materialized_postgresql_backoff_factor)
, milliseconds_to_wait(reschedule_backoff_min_ms)
{ {
if (!schema_list.empty() && !tables_list.empty()) if (!schema_list.empty() && !tables_list.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and tables list at the same time"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot have schema list and tables list at the same time");
@ -166,7 +167,7 @@ void PostgreSQLReplicationHandler::checkConnectionAndStart()
throw; throw;
LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what()); LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what());
startup_task->scheduleAfter(RESCHEDULE_MS); startup_task->scheduleAfter(milliseconds_to_wait);
} }
catch (...) catch (...)
{ {
@ -435,18 +436,18 @@ void PostgreSQLReplicationHandler::consumerFunc()
if (schedule_now) if (schedule_now)
{ {
milliseconds_to_wait = RESCHEDULE_MS; milliseconds_to_wait = reschedule_backoff_min_ms;
consumer_task->schedule(); consumer_task->schedule();
LOG_DEBUG(log, "Scheduling replication thread: now"); LOG_DEBUG(log, "Scheduling replication thread: now");
} }
else else
{ {
consumer_task->scheduleAfter(milliseconds_to_wait); if (milliseconds_to_wait < reschedule_backoff_max_ms)
if (milliseconds_to_wait < BACKOFF_TRESHOLD_MS) milliseconds_to_wait = std::min(milliseconds_to_wait * reschedule_backoff_factor, reschedule_backoff_max_ms);
milliseconds_to_wait *= 2;
LOG_DEBUG(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait); LOG_DEBUG(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait);
consumer_task->scheduleAfter(milliseconds_to_wait);
} }
} }
@ -892,7 +893,7 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
catch (...) catch (...)
{ {
consumer_task->activate(); consumer_task->activate();
consumer_task->scheduleAfter(RESCHEDULE_MS); consumer_task->scheduleAfter(milliseconds_to_wait);
auto error_message = getCurrentExceptionMessage(false); auto error_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
@ -922,7 +923,7 @@ void PostgreSQLReplicationHandler::removeTableFromReplication(const String & pos
catch (...) catch (...)
{ {
consumer_task->activate(); consumer_task->activate();
consumer_task->scheduleAfter(RESCHEDULE_MS); consumer_task->scheduleAfter(milliseconds_to_wait);
auto error_message = getCurrentExceptionMessage(false); auto error_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,

View File

@ -140,13 +140,16 @@ private:
BackgroundSchedulePool::TaskHolder consumer_task; BackgroundSchedulePool::TaskHolder consumer_task;
BackgroundSchedulePool::TaskHolder cleanup_task; BackgroundSchedulePool::TaskHolder cleanup_task;
const UInt64 reschedule_backoff_min_ms;
const UInt64 reschedule_backoff_max_ms;
const UInt64 reschedule_backoff_factor;
UInt64 milliseconds_to_wait;
std::atomic<bool> stop_synchronization = false; std::atomic<bool> stop_synchronization = false;
/// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables. /// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables.
MaterializedStorages materialized_storages; MaterializedStorages materialized_storages;
UInt64 milliseconds_to_wait;
bool replication_handler_initialized = false; bool replication_handler_initialized = false;
}; };

View File

@ -76,16 +76,24 @@ def drop_postgres_schema(cursor, schema_name):
def create_postgres_table( def create_postgres_table(
cursor, table_name, replica_identity_full=False, template=postgres_table_template cursor,
table_name,
database_name="",
replica_identity_full=False,
template=postgres_table_template,
): ):
drop_postgres_table(cursor, table_name) if database_name == "":
cursor.execute(template.format(table_name)) name = table_name
else:
name = f"{database_name}.{table_name}"
drop_postgres_table(cursor, name)
cursor.execute(template.format(name))
if replica_identity_full: if replica_identity_full:
cursor.execute(f"ALTER TABLE {table_name} REPLICA IDENTITY FULL;") cursor.execute(f"ALTER TABLE {name} REPLICA IDENTITY FULL;")
def drop_postgres_table(cursor, table_name): def drop_postgres_table(cursor, name):
cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}" """) cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """)
def create_postgres_table_with_schema(cursor, schema_name, table_name): def create_postgres_table_with_schema(cursor, schema_name, table_name):
@ -103,13 +111,16 @@ class PostgresManager:
self.created_materialized_postgres_db_list = set() self.created_materialized_postgres_db_list = set()
self.created_ch_postgres_db_list = set() self.created_ch_postgres_db_list = set()
def init(self, instance, ip, port): def init(self, instance, ip, port, default_database="postgres_database"):
self.instance = instance self.instance = instance
self.ip = ip self.ip = ip
self.port = port self.port = port
self.conn = get_postgres_conn(ip=self.ip, port=self.port) self.default_database = default_database
self.prepare() self.prepare()
def get_default_database(self):
return self.default_database
def restart(self): def restart(self):
try: try:
self.clear() self.clear()
@ -119,10 +130,17 @@ class PostgresManager:
raise ex raise ex
def prepare(self): def prepare(self):
conn = get_postgres_conn(ip=self.ip, port=self.port) self.conn = get_postgres_conn(ip=self.ip, port=self.port)
cursor = conn.cursor() self.cursor = self.conn.cursor()
self.create_postgres_db(cursor, "postgres_database") if self.default_database != "":
self.create_clickhouse_postgres_db(ip=self.ip, port=self.port) self.create_postgres_db(self.default_database)
self.conn = get_postgres_conn(
ip=self.ip,
port=self.port,
database=True,
database_name=self.default_database,
)
self.cursor = self.conn.cursor()
def clear(self): def clear(self):
if self.conn.closed == 0: if self.conn.closed == 0:
@ -132,63 +150,76 @@ class PostgresManager:
for db in self.created_ch_postgres_db_list.copy(): for db in self.created_ch_postgres_db_list.copy():
self.drop_clickhouse_postgres_db(db) self.drop_clickhouse_postgres_db(db)
if len(self.created_postgres_db_list) > 0: if len(self.created_postgres_db_list) > 0:
conn = get_postgres_conn(ip=self.ip, port=self.port) self.conn = get_postgres_conn(ip=self.ip, port=self.port)
cursor = conn.cursor() self.cursor = self.conn.cursor()
for db in self.created_postgres_db_list.copy(): for db in self.created_postgres_db_list.copy():
self.drop_postgres_db(cursor, db) self.drop_postgres_db(db)
def get_db_cursor(self): def get_db_cursor(self, database_name=""):
self.conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) if database_name == "":
database_name = self.default_database
self.conn = get_postgres_conn(
ip=self.ip, port=self.port, database=True, database_name=database_name
)
return self.conn.cursor() return self.conn.cursor()
def create_postgres_db(self, cursor, name="postgres_database"): def database_or_default(self, database_name):
self.drop_postgres_db(cursor, name) if database_name == "" and self.default_database == "":
self.created_postgres_db_list.add(name) raise Exception("Database name is empty")
cursor.execute(f"CREATE DATABASE {name}") if database_name == "":
database_name = self.default_database
return database_name
def drop_postgres_db(self, cursor, name="postgres_database"): def create_postgres_db(self, database_name=""):
cursor.execute(f"DROP DATABASE IF EXISTS {name}") database_name = self.database_or_default(database_name)
if name in self.created_postgres_db_list: self.drop_postgres_db(database_name)
self.created_postgres_db_list.remove(name) self.created_postgres_db_list.add(database_name)
self.cursor.execute(f"CREATE DATABASE {database_name}")
def drop_postgres_db(self, database_name=""):
database_name = self.database_or_default(database_name)
self.cursor.execute(f"DROP DATABASE IF EXISTS {database_name}")
if database_name in self.created_postgres_db_list:
self.created_postgres_db_list.remove(database_name)
def create_clickhouse_postgres_db( def create_clickhouse_postgres_db(
self, self,
ip, database_name="",
port,
name="postgres_database",
database_name="postgres_database",
schema_name="", schema_name="",
): ):
self.drop_clickhouse_postgres_db(name) database_name = self.database_or_default(database_name)
self.created_ch_postgres_db_list.add(name) self.drop_clickhouse_postgres_db(database_name)
self.created_ch_postgres_db_list.add(database_name)
if len(schema_name) == 0: if len(schema_name) == 0:
self.instance.query( self.instance.query(
f""" f"""
CREATE DATABASE {name} CREATE DATABASE {database_name}
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword')""" ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword')"""
) )
else: else:
self.instance.query( self.instance.query(
f""" f"""
CREATE DATABASE {name} CREATE DATABASE {database_name}
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')""" ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')"""
) )
def drop_clickhouse_postgres_db(self, name="postgres_database"): def drop_clickhouse_postgres_db(self, database_name=""):
self.instance.query(f"DROP DATABASE IF EXISTS {name}") database_name = self.database_or_default(database_name)
if name in self.created_ch_postgres_db_list: self.instance.query(f"DROP DATABASE IF EXISTS {database_name}")
self.created_ch_postgres_db_list.remove(name) if database_name in self.created_ch_postgres_db_list:
self.created_ch_postgres_db_list.remove(database_name)
def create_materialized_db( def create_materialized_db(
self, self,
ip, ip,
port, port,
materialized_database="test_database", materialized_database="test_database",
postgres_database="postgres_database", postgres_database="",
settings=[], settings=[],
table_overrides="", table_overrides="",
): ):
postgres_database = self.database_or_default(postgres_database)
self.created_materialized_postgres_db_list.add(materialized_database) self.created_materialized_postgres_db_list.add(materialized_database)
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}") self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
@ -207,17 +238,12 @@ class PostgresManager:
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database} SYNC") self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database} SYNC")
if materialized_database in self.created_materialized_postgres_db_list: if materialized_database in self.created_materialized_postgres_db_list:
self.created_materialized_postgres_db_list.remove(materialized_database) self.created_materialized_postgres_db_list.remove(materialized_database)
assert materialized_database not in self.instance.query("SHOW DATABASES")
def create_and_fill_postgres_table(self, table_name): def create_and_fill_postgres_table(self, table_name, database_name=""):
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) create_postgres_table(self.cursor, table_name, database_name)
cursor = conn.cursor() database_name = self.database_or_default(database_name)
self.create_and_fill_postgres_table_from_cursor(cursor, table_name)
def create_and_fill_postgres_table_from_cursor(self, cursor, table_name):
create_postgres_table(cursor, table_name)
self.instance.query( self.instance.query(
f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)" f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)"
) )
def create_and_fill_postgres_tables(self, tables_num, numbers=50): def create_and_fill_postgres_tables(self, tables_num, numbers=50):

View File

@ -46,14 +46,34 @@ instance = cluster.add_instance(
stay_alive=True, stay_alive=True,
) )
instance2 = cluster.add_instance(
"instance2",
main_configs=["configs/log_conf.xml", "configs/merge_tree_too_many_parts.xml"],
user_configs=["configs/users.xml"],
with_postgres=True,
stay_alive=True,
)
pg_manager = PostgresManager() pg_manager = PostgresManager()
pg_manager2 = PostgresManager()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
try: try:
cluster.start() cluster.start()
pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port) pg_manager.init(
instance,
cluster.postgres_ip,
cluster.postgres_port,
default_database="test_database",
)
pg_manager.create_clickhouse_postgres_db()
pg_manager2.init(
instance2, cluster.postgres_ip, cluster.postgres_port, "test_database2"
)
pg_manager2.create_clickhouse_postgres_db()
yield cluster yield cluster
finally: finally:
@ -649,6 +669,59 @@ def test_materialized_view(started_cluster):
pg_manager.drop_materialized_db() pg_manager.drop_materialized_db()
def test_too_many_parts(started_cluster):
table = "test_table"
pg_manager2.create_and_fill_postgres_table(table)
pg_manager2.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = 'test_table', materialized_postgresql_backoff_min_ms = 100, materialized_postgresql_backoff_max_ms = 100"
],
)
check_tables_are_synchronized(
instance2, "test_table", postgres_database=pg_manager2.get_default_database()
)
assert (
"50" == instance2.query("SELECT count() FROM test_database.test_table").strip()
)
instance2.query("SYSTEM STOP MERGES")
num = 50
for i in range(10):
instance2.query(
f"""
INSERT INTO {pg_manager2.get_default_database()}.test_table SELECT {num}, {num};
"""
)
num = num + 1
for i in range(30):
if num == int(
instance2.query("SELECT count() FROM test_database.test_table")
) or instance2.contains_in_log("DB::Exception: Too many parts"):
break
time.sleep(1)
print(f"wait sync try {i}")
if instance2.contains_in_log("DB::Exception: Too many parts"):
num = num - 1
break
assert num == int(
instance2.query("SELECT count() FROM test_database.test_table")
)
assert instance2.contains_in_log("DB::Exception: Too many parts")
print(num)
assert num == int(instance2.query("SELECT count() FROM test_database.test_table"))
instance2.query("SYSTEM START MERGES")
check_tables_are_synchronized(
instance2, "test_table", postgres_database=pg_manager2.get_default_database()
)
# assert "200" == instance.query("SELECT count FROM test_database.test_table").strip()
pg_manager2.drop_materialized_db()
if __name__ == "__main__": if __name__ == "__main__":
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")