mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #69786 from ClickHouse/backport/24.8/62730
Backport #62730 to 24.8: [bugfix] MaterializedPostgreSQL Cannot attach table when pg dbname contains "-", need doubleQuoting
This commit is contained in:
commit
46dd08d463
@ -157,7 +157,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
|
|||||||
|
|
||||||
checkReplicationSlot(replication_slot);
|
checkReplicationSlot(replication_slot);
|
||||||
|
|
||||||
LOG_INFO(log, "Using replication slot {} and publication {}", replication_slot, publication_name);
|
LOG_INFO(log, "Using replication slot {} and publication {}", replication_slot, doubleQuoteString(publication_name));
|
||||||
|
|
||||||
startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
|
startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
|
||||||
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
|
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
|
||||||
@ -543,7 +543,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
|
|||||||
/// This is a case for single Materialized storage. In case of database engine this check is done in advance.
|
/// This is a case for single Materialized storage. In case of database engine this check is done in advance.
|
||||||
LOG_WARNING(log,
|
LOG_WARNING(log,
|
||||||
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
||||||
publication_name);
|
doubleQuoteString(publication_name));
|
||||||
|
|
||||||
dropPublication(tx);
|
dropPublication(tx);
|
||||||
}
|
}
|
||||||
@ -573,7 +573,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
tx.exec(query_str);
|
tx.exec(query_str);
|
||||||
LOG_DEBUG(log, "Created publication {} with tables list: {}", publication_name, tables_list);
|
LOG_DEBUG(log, "Created publication {} with tables list: {}", doubleQuoteString(publication_name), tables_list);
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
@ -583,7 +583,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransactio
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "Using existing publication ({}) version", publication_name);
|
LOG_DEBUG(log, "Using existing publication ({}) version", doubleQuoteString(publication_name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -661,7 +661,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
|
|||||||
{
|
{
|
||||||
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
|
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
|
||||||
tx.exec(query_str);
|
tx.exec(query_str);
|
||||||
LOG_DEBUG(log, "Dropped publication: {}", publication_name);
|
LOG_DEBUG(log, "Dropped publication: {}", doubleQuoteString(publication_name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -677,7 +677,7 @@ void PostgreSQLReplicationHandler::removeTableFromPublication(pqxx::nontransacti
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name));
|
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", doubleQuoteString(publication_name), doubleQuoteWithSchema(table_name));
|
||||||
ntx.exec(query_str);
|
ntx.exec(query_str);
|
||||||
LOG_TRACE(log, "Removed table `{}` from publication `{}`", doubleQuoteWithSchema(table_name), publication_name);
|
LOG_TRACE(log, "Removed table `{}` from publication `{}`", doubleQuoteWithSchema(table_name), publication_name);
|
||||||
}
|
}
|
||||||
@ -764,7 +764,7 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
|
|||||||
{
|
{
|
||||||
LOG_WARNING(log,
|
LOG_WARNING(log,
|
||||||
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
|
||||||
publication_name);
|
doubleQuoteString(publication_name));
|
||||||
|
|
||||||
connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
|
||||||
}
|
}
|
||||||
@ -774,7 +774,7 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
|
|||||||
{
|
{
|
||||||
LOG_WARNING(log,
|
LOG_WARNING(log,
|
||||||
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
|
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
|
||||||
publication_name);
|
doubleQuoteString(publication_name));
|
||||||
|
|
||||||
{
|
{
|
||||||
pqxx::nontransaction tx(connection.getRef());
|
pqxx::nontransaction tx(connection.getRef());
|
||||||
@ -825,7 +825,7 @@ std::set<String> PostgreSQLReplicationHandler::fetchRequiredTables()
|
|||||||
"To avoid redundant work, you can try ALTER PUBLICATION query to remove redundant tables. "
|
"To avoid redundant work, you can try ALTER PUBLICATION query to remove redundant tables. "
|
||||||
"Or you can you ALTER SETTING. "
|
"Or you can you ALTER SETTING. "
|
||||||
"\nPublication tables: {}.\nTables list: {}",
|
"\nPublication tables: {}.\nTables list: {}",
|
||||||
publication_name, diff_tables, publication_tables, listed_tables);
|
doubleQuoteString(publication_name), diff_tables, publication_tables, listed_tables);
|
||||||
|
|
||||||
return std::set(expected_tables.begin(), expected_tables.end());
|
return std::set(expected_tables.begin(), expected_tables.end());
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ version: '2.3'
|
|||||||
services:
|
services:
|
||||||
postgres1:
|
postgres1:
|
||||||
image: postgres
|
image: postgres
|
||||||
command: ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=2", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all", "-c", "max_connections=200"]
|
command: ["postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=4", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all", "-c", "max_connections=200"]
|
||||||
restart: always
|
restart: always
|
||||||
expose:
|
expose:
|
||||||
- ${POSTGRES_PORT:-5432}
|
- ${POSTGRES_PORT:-5432}
|
||||||
|
@ -82,24 +82,24 @@ def drop_postgres_schema(cursor, schema_name):
|
|||||||
def create_postgres_table(
|
def create_postgres_table(
|
||||||
cursor,
|
cursor,
|
||||||
table_name,
|
table_name,
|
||||||
database_name="",
|
|
||||||
replica_identity_full=False,
|
replica_identity_full=False,
|
||||||
template=postgres_table_template,
|
template=postgres_table_template,
|
||||||
):
|
):
|
||||||
if database_name == "":
|
drop_postgres_table(cursor, table_name)
|
||||||
name = table_name
|
query = template.format(table_name)
|
||||||
else:
|
|
||||||
name = f"{database_name}.{table_name}"
|
|
||||||
drop_postgres_table(cursor, name)
|
|
||||||
query = template.format(name)
|
|
||||||
cursor.execute(query)
|
|
||||||
print(f"Query: {query}")
|
print(f"Query: {query}")
|
||||||
|
cursor.execute(query)
|
||||||
|
|
||||||
if replica_identity_full:
|
if replica_identity_full:
|
||||||
cursor.execute(f"ALTER TABLE {name} REPLICA IDENTITY FULL;")
|
cursor.execute(f"""ALTER TABLE "{table_name}" REPLICA IDENTITY FULL;""")
|
||||||
|
|
||||||
|
|
||||||
def drop_postgres_table(cursor, name):
|
def drop_postgres_table(cursor, name, database_name=""):
|
||||||
cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """)
|
if database_name != "":
|
||||||
|
cursor.execute(f"""DROP TABLE IF EXISTS "{database_name}"."{name}" """)
|
||||||
|
else:
|
||||||
|
cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """)
|
||||||
|
|
||||||
|
|
||||||
def create_postgres_table_with_schema(cursor, schema_name, table_name):
|
def create_postgres_table_with_schema(cursor, schema_name, table_name):
|
||||||
@ -269,15 +269,28 @@ class PostgresManager:
|
|||||||
def create_postgres_table(
|
def create_postgres_table(
|
||||||
self, table_name, database_name="", template=postgres_table_template
|
self, table_name, database_name="", template=postgres_table_template
|
||||||
):
|
):
|
||||||
create_postgres_table(
|
database_name = self.database_or_default(database_name)
|
||||||
self.cursor, table_name, database_name=database_name, template=template
|
cursor = self.cursor
|
||||||
)
|
if database_name != self.get_default_database:
|
||||||
|
try:
|
||||||
|
self.create_postgres_db(database_name)
|
||||||
|
except:
|
||||||
|
# postgres does not support create database if not exists
|
||||||
|
pass
|
||||||
|
conn = get_postgres_conn(
|
||||||
|
ip=self.ip,
|
||||||
|
port=self.port,
|
||||||
|
database=True,
|
||||||
|
database_name=database_name,
|
||||||
|
)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
create_postgres_table(cursor, table_name, template=template)
|
||||||
|
|
||||||
def create_and_fill_postgres_table(self, table_name, database_name=""):
|
def create_and_fill_postgres_table(self, table_name, database_name=""):
|
||||||
create_postgres_table(self.cursor, table_name, database_name)
|
|
||||||
database_name = self.database_or_default(database_name)
|
database_name = self.database_or_default(database_name)
|
||||||
|
self.create_postgres_table(table_name, database_name)
|
||||||
self.instance.query(
|
self.instance.query(
|
||||||
f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)"
|
f"INSERT INTO `{database_name}`.`{table_name}` SELECT number, number from numbers(50)"
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_and_fill_postgres_tables(
|
def create_and_fill_postgres_tables(
|
||||||
@ -289,11 +302,11 @@ class PostgresManager:
|
|||||||
):
|
):
|
||||||
for i in range(tables_num):
|
for i in range(tables_num):
|
||||||
table_name = f"{table_name_base}_{i}"
|
table_name = f"{table_name_base}_{i}"
|
||||||
create_postgres_table(self.cursor, table_name, database_name)
|
self.create_postgres_table(table_name, database_name)
|
||||||
if numbers > 0:
|
if numbers > 0:
|
||||||
db = self.database_or_default(database_name)
|
db = self.database_or_default(database_name)
|
||||||
self.instance.query(
|
self.instance.query(
|
||||||
f"INSERT INTO {db}.{table_name} SELECT number, number from numbers({numbers})"
|
f"INSERT INTO `{db}`.{table_name} SELECT number, number from numbers({numbers})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -403,4 +416,9 @@ def check_several_tables_are_synchronized(
|
|||||||
schema_name="",
|
schema_name="",
|
||||||
):
|
):
|
||||||
for i in range(tables_num):
|
for i in range(tables_num):
|
||||||
check_tables_are_synchronized(instance, f"postgresql_replica_{i}")
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
f"postgresql_replica_{i}",
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
)
|
||||||
|
@ -1097,6 +1097,110 @@ def test_dependent_loading(started_cluster):
|
|||||||
instance.query(f"DROP TABLE {table} SYNC")
|
instance.query(f"DROP TABLE {table} SYNC")
|
||||||
|
|
||||||
|
|
||||||
|
def test_quoting_publication(started_cluster):
|
||||||
|
postgres_database = "postgres-postgres"
|
||||||
|
pg_manager3 = PostgresManager()
|
||||||
|
pg_manager3.init(
|
||||||
|
instance,
|
||||||
|
cluster.postgres_ip,
|
||||||
|
cluster.postgres_port,
|
||||||
|
default_database=postgres_database,
|
||||||
|
)
|
||||||
|
NUM_TABLES = 5
|
||||||
|
materialized_database = "test-database"
|
||||||
|
|
||||||
|
pg_manager3.create_and_fill_postgres_tables(NUM_TABLES, 10000)
|
||||||
|
|
||||||
|
check_table_name_1 = "postgresql-replica-5"
|
||||||
|
pg_manager3.create_and_fill_postgres_table(check_table_name_1)
|
||||||
|
|
||||||
|
pg_manager3.create_materialized_db(
|
||||||
|
ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
)
|
||||||
|
check_several_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
NUM_TABLES,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||||
|
assert (
|
||||||
|
result
|
||||||
|
== "postgresql-replica-5\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_1,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
instance.query(
|
||||||
|
f"INSERT INTO `{postgres_database}`.`{check_table_name_1}` SELECT number, number from numbers(10000, 10000)"
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_1,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
|
||||||
|
check_table_name_2 = "postgresql-replica-6"
|
||||||
|
pg_manager3.create_and_fill_postgres_table(check_table_name_2)
|
||||||
|
|
||||||
|
instance.query(f"ATTACH TABLE `{materialized_database}`.`{check_table_name_2}`")
|
||||||
|
|
||||||
|
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||||
|
assert (
|
||||||
|
result
|
||||||
|
== "postgresql-replica-5\npostgresql-replica-6\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_2,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
instance.query(
|
||||||
|
f"INSERT INTO `{postgres_database}`.`{check_table_name_2}` SELECT number, number from numbers(10000, 10000)"
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_2,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
|
||||||
|
instance.restart_clickhouse()
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_1,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance,
|
||||||
|
check_table_name_2,
|
||||||
|
materialized_database=materialized_database,
|
||||||
|
postgres_database=postgres_database,
|
||||||
|
)
|
||||||
|
|
||||||
|
instance.query(
|
||||||
|
f"DETACH TABLE `{materialized_database}`.`{check_table_name_2}` PERMANENTLY"
|
||||||
|
)
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||||
|
assert (
|
||||||
|
result
|
||||||
|
== "postgresql-replica-5\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cluster.start()
|
cluster.start()
|
||||||
input("Cluster created, press any key to destroy...")
|
input("Cluster created, press any key to destroy...")
|
||||||
|
Loading…
Reference in New Issue
Block a user