Minor adjustments

This commit is contained in:
kssenii 2021-01-10 10:03:01 +00:00
parent fc9de76f7d
commit d952b0897e
8 changed files with 91 additions and 55 deletions

View File

@ -24,7 +24,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
}
@ -67,13 +66,9 @@ Block PostgreSQLBlockInputStream::readImpl()
{
const std::vector<pqxx::zview> * row{stream->read_row()};
/// row is nullptr if pqxx::stream_from is finished
if (!row)
{
/// row is nullptr if pqxx::stream_from is finished
stream->complete();
tx->commit();
break;
}
for (const auto idx : ext::range(0, row->size()))
{
@ -109,6 +104,16 @@ Block PostgreSQLBlockInputStream::readImpl()
}
void PostgreSQLBlockInputStream::readSuffix()
{
if (stream)
{
stream->complete();
tx->commit();
}
}
void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx)
{
@ -160,7 +165,8 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:
case ValueType::vtDecimal128: [[fallthrough]];
case ValueType::vtDecimal256:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
@ -207,8 +213,6 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}

View File

@ -32,6 +32,7 @@ private:
void readPrefix() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx);

View File

@ -86,7 +86,6 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
/// Already connected to the needed database, search will be done there
pqxx::read_transaction tx(*connection->conn());
for (auto table_name : tx.stream<std::string>(query))
@ -99,11 +98,22 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
{
pqxx::nontransaction tx(*connection->conn());
pqxx::result result = tx.exec(fmt::format(
"SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' AND tablename = '{}'", table_name));
return !result.empty();
try
{
pqxx::result result = tx.exec(fmt::format("select '{}'::regclass", table_name));
}
catch (pqxx::undefined_table const &)
{
return false;
}
catch (Exception & e)
{
e.addMessage("while checking postgresql table existance");
throw;
}
return true;
}
@ -137,7 +147,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte
return StoragePtr{};
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchTableStructure(connection->conn(), table_name, use_nulls);
auto columns = fetchPostgreSQLTableStructure(connection->conn(), table_name, use_nulls);
if (!columns)
return StoragePtr{};
@ -146,8 +156,6 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte
StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnection>(connection->conn_str()),
ColumnsDescription{*columns}, ConstraintsDescription{}, context);
/// There is no easy (embedded) way in postgres to check table modification time, so if `cache_tables` == 1 (default: 0)
/// table structure is cached and not checked for being modififed, but it will be updated during detach->attach.
if (cache_tables)
cached_tables[table_name] = storage;

View File

@ -10,13 +10,17 @@
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StoragePostgreSQL.h>
#include <pqxx/pqxx>
namespace DB
{
class Context;
/** Real-time access to table list and table structure from remote PostgreSQL.
* All tables are created after pull-out structure from remote PostgreSQL.
* If `cache_tables` == 1 (default: 0) table structure is cached and not checked for being modififed,
* but it will be updated during detach->attach.
*/
class DatabasePostgreSQL final : public IDatabase
{

View File

@ -2,6 +2,7 @@
#if USE_LIBPQXX
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
@ -19,10 +20,11 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
extern const int UNKNOWN_TABLE;
}
std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls)
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();
@ -32,18 +34,30 @@ std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection,
"FROM pg_attribute "
"WHERE attrelid = '{}'::regclass "
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
pqxx::read_transaction tx(*connection);
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
try
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
getDataType(std::get<1>(row), use_nulls && (std::get<2>(row) == "f"), std::get<3>(row))));
pqxx::read_transaction tx(*connection);
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
convertPostgreSQLDataType(
std::get<1>(row),
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row))));
}
stream.complete();
tx.commit();
}
catch (pqxx::undefined_table const &)
{
throw Exception(fmt::format(
"PostgreSQL table {}.{} does not exist",
connection->dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE);
}
stream.complete();
tx.commit();
if (columns.empty())
return nullptr;
@ -52,12 +66,12 @@ std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection,
}
DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimensions)
DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullable, uint16_t dimensions)
{
DataTypePtr res;
/// Get rid of trailing '[]' for arrays
if (dimensions)
if (dimensions && type.ends_with("[]"))
type.resize(type.size() - 2);
if (type == "smallint")
@ -80,20 +94,10 @@ DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimension
res = std::make_shared<DataTypeDate>();
else if (type.starts_with("numeric"))
{
/// Numeric and decimal will both end up here as numeric
/// Will get numeric(precision, scale) string, need to extract precision and scale
std::vector<std::string> result;
boost::split(result, type, [](char c){ return c == '(' || c == ',' || c == ')'; });
for (std::string & key : result)
boost::trim(key);
/// If precision or scale are not specified, postgres creates a column in which numeric values of
/// any precision and scale can be stored, so may be maxPrecision may be used instead of exception
if (result.size() < 3)
throw Exception("Numeric lacks precision and scale in its definition", ErrorCodes::UNKNOWN_TYPE);
uint32_t precision = pqxx::from_string<uint32_t>(result[1]);
uint32_t scale = pqxx::from_string<uint32_t>(result[2]);
/// Numeric and decimal will both end up here as numeric.
res = DataTypeFactory::instance().get(type);
uint32_t precision = getDecimalPrecision(*res);
uint32_t scale = getDecimalScale(*res);
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
res = std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
@ -101,6 +105,8 @@ DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimension
res = std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
res = std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal256>())
res = std::make_shared<DataTypeDecimal<Decimal256>>(precision, scale);
}
if (!res)

View File

@ -10,8 +10,8 @@
namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls);
DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimensions);
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls);
DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullable, uint16_t dimensions);
}

View File

@ -102,11 +102,13 @@ public:
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void writePrefix() override
{
work = std::make_unique<pqxx::work>(*connection);
}
void write(const Block & block) override
{
const auto columns = block.getColumns();
@ -143,24 +145,28 @@ public:
row[j] = ostr.str();
}
}
/// pqxx::stream_to is much faster than simple insert, especially for large number of rows
stream_inserter->write_values(row);
}
}
void writeSuffix() override
{
if (stream_inserter)
{
stream_inserter->complete();
work->commit();
work->commit();
}
}
/// Cannot just use serializeAsText for array data type even though it converts perfectly
/// any dimension number array into text format, because it incloses in '[]' and for postgres it must be '{}'.
void parseArray(Field array_field, DataTypePtr data_type, WriteBuffer & ostr)
void parseArray(const Field & array_field, const DataTypePtr & data_type, WriteBuffer & ostr)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
const auto nested = array_type->getNestedType();
const auto & nested = array_type->getNestedType();
const auto & array = array_field.get<Array>();
if (!isArray(nested))
@ -190,13 +196,14 @@ public:
writeChar('}', ostr);
}
/// Conversion is done via column casting because with writeText(Array..) got incorrect conversion
/// of Date and DateTime data types and it added extra quotes for values inside array.
std::string clickhouseToPostgresArray(const Array & array_field, DataTypePtr data_type)
std::string clickhouseToPostgresArray(const Array & array_field, const DataTypePtr & data_type)
{
auto nested = typeid_cast<const DataTypeArray *>(data_type.get())->getNestedType();
ColumnPtr nested_column(createNested(nested));
const auto array_column{ColumnArray::create(nested_column)};
auto array_column{ColumnArray::create(nested_column)};
const_cast<ColumnArray *>(array_column.get())->insert(array_field);
WriteBufferFromOwnString ostr;
data_type->serializeAsText(*array_column, 0, ostr, FormatSettings{});
@ -205,7 +212,8 @@ public:
return '{' + std::string(ostr.str().begin() + 1, ostr.str().end() - 1) + '}';
}
ColumnPtr createNested(DataTypePtr nested)
static ColumnPtr createNested(DataTypePtr nested)
{
bool is_nullable = false;
if (nested->isNullable())
@ -242,6 +250,11 @@ public:
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
nested_column = ColumnDecimal<Decimal128>::create(0, type->getScale());
}
else if (which.isDecimal256())
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());
nested_column = ColumnDecimal<Decimal256>::create(0, type->getScale());
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversion not supported");

View File

@ -37,7 +37,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(const Context & context) const
{
const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchTableStructure(connection->conn(), remote_table_name, use_nulls);
auto columns = fetchPostgreSQLTableStructure(connection->conn(), remote_table_name, use_nulls);
return ColumnsDescription{*columns};
}