mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #30477 from kssenii/postgres-fix-query
More reliable fetch query in postgres database
This commit is contained in:
commit
00d0665bf2
@ -63,11 +63,12 @@ String DatabasePostgreSQL::getTableNameForLogs(const String & table_name) const
|
||||
}
|
||||
|
||||
|
||||
String DatabasePostgreSQL::formatTableName(const String & table_name) const
|
||||
String DatabasePostgreSQL::formatTableName(const String & table_name, bool quoted) const
|
||||
{
|
||||
if (configuration.schema.empty())
|
||||
return doubleQuoteString(table_name);
|
||||
return fmt::format("{}.{}", doubleQuoteString(configuration.schema), doubleQuoteString(table_name));
|
||||
return quoted ? doubleQuoteString(table_name) : table_name;
|
||||
return quoted ? fmt::format("{}.{}", doubleQuoteString(configuration.schema), doubleQuoteString(table_name))
|
||||
: fmt::format("{}.{}", configuration.schema, table_name);
|
||||
}
|
||||
|
||||
|
||||
@ -89,14 +90,23 @@ bool DatabasePostgreSQL::empty() const
|
||||
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & /* filter_by_table_name */) const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
Tables tables;
|
||||
auto connection_holder = pool->get();
|
||||
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
|
||||
|
||||
for (const auto & table_name : table_names)
|
||||
if (!detached_or_dropped.count(table_name))
|
||||
tables[table_name] = fetchTable(table_name, local_context, true);
|
||||
/// Do not allow to throw here, because this might be, for example, a query to system.tables.
|
||||
/// It must not fail on case of some postgres error.
|
||||
try
|
||||
{
|
||||
auto connection_holder = pool->get();
|
||||
auto table_names = fetchPostgreSQLTablesList(connection_holder->get(), configuration.schema);
|
||||
|
||||
for (const auto & table_name : table_names)
|
||||
if (!detached_or_dropped.count(table_name))
|
||||
tables[table_name] = fetchTable(table_name, local_context, true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
|
||||
}
|
||||
@ -170,7 +180,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
|
||||
return StoragePtr{};
|
||||
|
||||
auto connection_holder = pool->get();
|
||||
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), formatTableName(table_name)).columns;
|
||||
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), table_name, configuration.schema).columns;
|
||||
|
||||
if (!columns)
|
||||
return StoragePtr{};
|
||||
|
@ -79,7 +79,7 @@ private:
|
||||
|
||||
String getTableNameForLogs(const String & table_name) const;
|
||||
|
||||
String formatTableName(const String & table_name) const;
|
||||
String formatTableName(const String & table_name, bool quoted = true) const;
|
||||
|
||||
bool checkPostgresTable(const String & table_name) const;
|
||||
|
||||
|
@ -135,7 +135,7 @@ static DataTypePtr convertPostgreSQLDataType(String & type, Fn<void()> auto && r
|
||||
|
||||
template<typename T>
|
||||
std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
|
||||
T & tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types)
|
||||
T & tx, const String & postgres_table, const String & query, bool use_nulls, bool only_names_and_types)
|
||||
{
|
||||
auto columns = NamesAndTypes();
|
||||
|
||||
@ -180,7 +180,7 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
|
||||
|
||||
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
|
||||
/// such arrays are not able to be used as ClickHouse Array at all.
|
||||
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table_name))};
|
||||
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table))};
|
||||
auto dimensions = result[0][0].as<int>();
|
||||
|
||||
/// It is always 1d array if it is in recheck.
|
||||
@ -191,10 +191,13 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
|
||||
columns[i] = NameAndTypePair(name_and_type.name, type);
|
||||
}
|
||||
}
|
||||
|
||||
catch (const pqxx::undefined_table &)
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table_name);
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table);
|
||||
}
|
||||
catch (const pqxx::syntax_error & e)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error: {} (in query: {})", e.what(), query);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -208,18 +211,27 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
|
||||
|
||||
template<typename T>
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
T & tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
|
||||
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
|
||||
{
|
||||
PostgreSQLTableStructure table;
|
||||
|
||||
auto where = fmt::format("relname = {}", quoteString(postgres_table));
|
||||
if (postgres_schema.empty())
|
||||
where += " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')";
|
||||
else
|
||||
where += fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
|
||||
|
||||
std::string query = fmt::format(
|
||||
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
|
||||
"attnotnull AS not_null, attndims AS dims "
|
||||
"FROM pg_attribute "
|
||||
"WHERE attrelid = {}::regclass "
|
||||
"AND NOT attisdropped AND attnum > 0", quoteString(postgres_table_name));
|
||||
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
|
||||
"AND NOT attisdropped AND attnum > 0", where);
|
||||
|
||||
table.columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, false);
|
||||
table.columns = readNamesAndTypesList(tx, postgres_table, query, use_nulls, false);
|
||||
|
||||
if (!table.columns)
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table);
|
||||
|
||||
if (with_primary_key)
|
||||
{
|
||||
@ -229,9 +241,9 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
"FROM pg_index i "
|
||||
"JOIN pg_attribute a ON a.attrelid = i.indrelid "
|
||||
"AND a.attnum = ANY(i.indkey) "
|
||||
"WHERE i.indrelid = {}::regclass AND i.indisprimary", quoteString(postgres_table_name));
|
||||
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) AND i.indisprimary", where);
|
||||
|
||||
table.primary_key_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
|
||||
table.primary_key_columns = readNamesAndTypesList(tx, postgres_table, query, use_nulls, true);
|
||||
}
|
||||
|
||||
if (with_replica_identity_index && !table.primary_key_columns)
|
||||
@ -254,19 +266,19 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
"and t.relname = {} " /// Connection is already done to a needed database, only table name is needed.
|
||||
"and ix.indisreplident = 't' " /// index is is replica identity index
|
||||
"ORDER BY a.attname", /// column names
|
||||
quoteString(postgres_table_name));
|
||||
quoteString(postgres_table));
|
||||
|
||||
table.replica_identity_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
|
||||
table.replica_identity_columns = readNamesAndTypesList(tx, postgres_table, query, use_nulls, true);
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table_name, bool use_nulls)
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls)
|
||||
{
|
||||
pqxx::ReadTransaction tx(connection);
|
||||
auto result = fetchPostgreSQLTableStructure(tx, postgres_table_name, use_nulls, false, false);
|
||||
auto result = fetchPostgreSQLTableStructure(tx, postgres_table, postgres_schema, use_nulls, false, false);
|
||||
tx.commit();
|
||||
return result;
|
||||
}
|
||||
@ -283,18 +295,18 @@ std::set<String> fetchPostgreSQLTablesList(pqxx::connection & connection, const
|
||||
|
||||
template
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
pqxx::ReadTransaction & tx, const String & postgres_table_name, bool use_nulls,
|
||||
bool with_primary_key, bool with_replica_identity_index);
|
||||
pqxx::ReadTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
||||
|
||||
template
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls,
|
||||
bool with_primary_key, bool with_replica_identity_index);
|
||||
pqxx::ReplicationTransaction & tx, const String & postgres_table, const String & postgres_schema,
|
||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
||||
|
||||
template
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
pqxx::nontransaction & tx, const String & postgres_table_name, bool use_nulls,
|
||||
bool with_primary_key, bool with_replica_identity_index);
|
||||
pqxx::nontransaction & tx, const String & postgres_table, const String & postrges_schema,
|
||||
bool use_nulls, bool with_primary_key, bool with_replica_identity_index);
|
||||
|
||||
template
|
||||
std::set<String> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
|
||||
|
@ -24,11 +24,11 @@ using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
|
||||
std::set<String> fetchPostgreSQLTablesList(pqxx::connection & connection, const String & postgres_schema);
|
||||
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls = true);
|
||||
pqxx::connection & connection, const String & postgres_table, const String & postgres_schema, bool use_nulls = true);
|
||||
|
||||
template<typename T>
|
||||
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
|
||||
T & tx, const String & postgres_table_name, bool use_nulls = true,
|
||||
T & tx, const String & postgres_table, const String & postgres_schema, bool use_nulls = true,
|
||||
bool with_primary_key = false, bool with_replica_identity_index = false);
|
||||
|
||||
template<typename T>
|
||||
|
@ -229,7 +229,9 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
|
||||
{
|
||||
postgres::Connection connection(connection_info);
|
||||
pqxx::nontransaction tx(connection.getRef());
|
||||
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, true, true, true));
|
||||
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true));
|
||||
if (!table_structure)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure");
|
||||
return storage->getCreateNestedTableQuery(std::move(table_structure));
|
||||
}
|
||||
|
||||
@ -649,7 +651,17 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
|
||||
if (!is_materialized_postgresql_database)
|
||||
return nullptr;
|
||||
|
||||
return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, true, true, true));
|
||||
PostgreSQLTableStructure structure;
|
||||
try
|
||||
{
|
||||
structure = fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
return std::make_unique<PostgreSQLTableStructure>(std::move(structure));
|
||||
}
|
||||
|
||||
|
||||
|
@ -49,11 +49,10 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr c
|
||||
const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
|
||||
auto connection_holder = connection_pool->get();
|
||||
auto columns = fetchPostgreSQLTableStructure(
|
||||
connection_holder->get(),
|
||||
configuration->schema.empty() ? doubleQuoteString(configuration->table)
|
||||
: doubleQuoteString(configuration->schema) + '.' + doubleQuoteString(configuration->table),
|
||||
use_nulls).columns;
|
||||
connection_holder->get(), configuration->table, configuration->schema, use_nulls).columns;
|
||||
|
||||
if (!columns)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
|
||||
return ColumnsDescription{*columns};
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user