Merge pull request #70318 from ClickHouse/backport/24.8/67664

Backport #67664 to 24.8: Fix error on generated columns in MaterializedPostgreSQL
This commit is contained in:
robot-ch-test-poll2 2024-10-03 19:11:05 +04:00 committed by GitHub
commit cc54fd6021
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 76 additions and 17 deletions

View File

@ -196,7 +196,7 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string, std::string> row;
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string, std::string, std::string> row;
while (stream >> row)
{
const auto column_name = std::get<0>(row);
@ -206,13 +206,14 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
std::get<3>(row));
columns.push_back(NameAndTypePair(column_name, data_type));
auto attgenerated = std::get<6>(row);
auto attgenerated = std::get<7>(row);
attributes.emplace(
column_name,
PostgreSQLTableStructure::PGAttribute{
.atttypid = parse<int>(std::get<4>(row)),
.atttypmod = parse<int>(std::get<5>(row)),
.attnum = parse<int>(std::get<6>(row)),
.atthasdef = false,
.attgenerated = attgenerated.empty() ? char{} : char(attgenerated[0]),
.attr_def = {}
@ -308,6 +309,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
"attndims AS dims, " /// array dimensions
"atttypid as type_id, "
"atttypmod as type_modifier, "
"attnum as att_num, "
"attgenerated as generated " /// if column has GENERATED
"FROM pg_attribute "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
@ -338,17 +340,29 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
"WHERE adrelid = (SELECT oid FROM pg_class WHERE {});", where);
pqxx::result result{tx.exec(attrdef_query)};
for (const auto row : result)
if (static_cast<uint64_t>(result.size()) > table.physical_columns->names.size())
{
size_t adnum = row[0].as<int>();
if (!adnum || adnum > table.physical_columns->names.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Received {} attrdef, but currently fetched columns list has {} columns",
result.size(), table.physical_columns->attributes.size());
}
for (const auto & column_attrs : table.physical_columns->attributes)
{
if (column_attrs.second.attgenerated != 's') /// e.g. not a generated column
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Received adnum {}, but currently fetched columns list has {} columns",
adnum, table.physical_columns->attributes.size());
continue;
}
for (const auto row : result)
{
int adnum = row[0].as<int>();
if (column_attrs.second.attnum == adnum)
{
table.physical_columns->attributes.at(column_attrs.first).attr_def = row[1].as<std::string>();
break;
}
}
const auto column_name = table.physical_columns->names[adnum - 1];
table.physical_columns->attributes.at(column_name).attr_def = row[1].as<std::string>();
}
}

View File

@ -16,6 +16,7 @@ struct PostgreSQLTableStructure
{
Int32 atttypid;
Int32 atttypmod;
Int32 attnum;
bool atthasdef;
char attgenerated;
std::string attr_def;

View File

@ -659,7 +659,7 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx
void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
{
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", doubleQuoteString(publication_name));
tx.exec(query_str);
LOG_DEBUG(log, "Dropped publication: {}", doubleQuoteString(publication_name));
}
@ -667,7 +667,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
void PostgreSQLReplicationHandler::addTableToPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name));
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", doubleQuoteString(publication_name), doubleQuoteWithSchema(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Added table {} to publication `{}`", doubleQuoteWithSchema(table_name), publication_name);
}

View File

@ -953,12 +953,14 @@ def test_generated_columns(started_cluster):
"",
f"""CREATE TABLE {table} (
key integer PRIMARY KEY,
x integer,
x integer DEFAULT 0,
temp integer DEFAULT 0,
y integer GENERATED ALWAYS AS (x*2) STORED,
z text);
z text DEFAULT 'z');
""",
)
pg_manager.execute(f"alter table {table} drop column temp;")
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');")
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');")
@ -991,6 +993,44 @@ def test_generated_columns(started_cluster):
)
def test_generated_columns_with_sequence(started_cluster):
table = "test_generated_columns_with_sequence"
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table} (
key integer PRIMARY KEY,
x integer,
y integer GENERATED ALWAYS AS (x*2) STORED,
z text);
""",
)
pg_manager.execute(
f"create sequence {table}_id_seq increment by 1 minvalue 1 start 1;"
)
pg_manager.execute(
f"alter table {table} alter key set default nextval('{table}_id_seq');"
)
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');")
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');")
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
],
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
def test_default_columns(started_cluster):
table = "test_default_columns"
@ -1087,9 +1127,13 @@ def test_dependent_loading(started_cluster):
nested_time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{uuid}_nested' and message not like '%like%'"
).strip()
time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{table}' and message not like '%like%'"
).strip()
time = (
instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{table}' and message not like '%like%'"
)
.strip()
.split("\n")[-1]
)
instance.query(
f"SELECT toDateTime64('{nested_time}', 6) < toDateTime64('{time}', 6)"
)