Fix postgres arrays

This commit is contained in:
kssenii 2021-06-21 11:27:10 +00:00
parent 8ba6a5393f
commit 0f9fc33a4e
2 changed files with 80 additions and 19 deletions

View File

@ -25,14 +25,19 @@ namespace ErrorCodes
}
static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions)
static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, uint16_t dimensions, const std::function<void()> & recheck_array)
{
DataTypePtr res;
bool is_array = false;
/// Get rid of trailing '[]' for arrays
if (dimensions)
if (type.ends_with("[]"))
{
is_array = true;
while (type.ends_with("[]"))
type.resize(type.size() - 2);
}
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
@ -88,8 +93,24 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
if (is_array)
{
/// In some cases att_ndims does not return correct number of dimensions
/// (it might return incorrect 0 number, for example, when a postgres table is created via 'as select * from table_with_arrays').
/// So recheck all arrays separately afterwards. (Cannot check here on the same connection because another query is in execution).
if (!dimensions)
{
/// Return 1d array type and recheck all arrays dims with array_ndims
res = std::make_shared<DataTypeArray>(res);
recheck_array();
}
else
{
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
}
}
return res;
}
@ -98,7 +119,7 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
postgres::ConnectionHolderPtr connection_holder, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();
auto columns = NamesAndTypes();
if (postgres_table_name.find('\'') != std::string::npos
|| postgres_table_name.find('\\') != std::string::npos)
@ -115,22 +136,46 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
try
{
pqxx::read_transaction tx(connection_holder->get());
auto stream{pqxx::stream_from::query(tx, query)};
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
std::set<size_t> recheck_arrays_indexes;
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
convertPostgreSQLDataType(
std::get<1>(row),
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row))));
pqxx::read_transaction tx(connection_holder->get());
auto stream{pqxx::stream_from::query(tx, query)};
std::tuple<std::string, std::string, std::string, uint16_t> row;
size_t i = 0;
auto recheck_array = [&]() { recheck_arrays_indexes.insert(i); };
while (stream >> row)
{
auto data_type = convertPostgreSQLDataType(std::get<1>(row),
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row),
recheck_array);
columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
++i;
}
stream.complete();
tx.commit();
}
for (auto & i : recheck_arrays_indexes)
{
const auto & name_and_type = columns[i];
pqxx::nontransaction tx(connection_holder->get());
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
/// such arrays are not able to be used as ClickHouse Array at all.
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table_name))};
auto dimensions = result[0][0].as<int>();
/// It is always 1d array if it is in recheck.
DataTypePtr type = assert_cast<const DataTypeArray *>(name_and_type.type.get())->getNestedType();
while (dimensions--)
type = std::make_shared<DataTypeArray>(type);
columns[i] = NameAndTypePair(name_and_type.name, type);
}
stream.complete();
tx.commit();
}
catch (const pqxx::undefined_table &)
{
throw Exception(fmt::format(
@ -146,7 +191,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
if (columns.empty())
return nullptr;
return std::make_shared<NamesAndTypesList>(columns);
return std::make_shared<NamesAndTypesList>(NamesAndTypesList(columns.begin(), columns.end()));
}
}

View File

@ -308,6 +308,22 @@ def test_postgres_distributed(started_cluster):
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_postgres_ndim(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute('CREATE TABLE arr1 (a Integer[])')
cursor.execute("INSERT INTO arr1 SELECT '{{1}, {2}}'")
# The point is in creating a table via 'as select *', in postgres att_ndim will not be correct in this case.
cursor.execute('CREATE TABLE arr2 AS SELECT * FROM arr1')
cursor.execute("SELECT attndims AS dims FROM pg_attribute WHERE attrelid = 'arr2'::regclass; ")
result = cursor.fetchall()[0]
assert(int(result[0]) == 0)
result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'clickhouse', 'arr2', 'postgres', 'mysecretpassword')''')
assert(result.strip() == "Array(Array(Nullable(Int32)))")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")