Add postgresql database engine

This commit is contained in:
kssenii 2020-12-21 19:20:56 +00:00
parent 2ab07fbd71
commit 00a37404ca
16 changed files with 758 additions and 144 deletions

View File

@ -79,6 +79,10 @@ if (USE_AMQPCPP)
add_headers_and_sources(dbms Storages/RabbitMQ)
endif()
if (USE_LIBPQXX)
add_headers_and_sources(dbms Databases/PostgreSQL)
endif()
if (USE_ROCKSDB)
add_headers_and_sources(dbms Storages/RocksDB)
endif()

View File

@ -28,6 +28,10 @@
# include <mysqlxx/Pool.h>
#endif
#if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h>
#endif
namespace DB
{
@ -80,7 +84,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
if (engine_name != "MySQL" && engine_name != "MaterializeMySQL" && engine_name != "Lazy" && engine_define->engine->arguments)
if (engine_name != "MySQL" && engine_name != "MaterializeMySQL" && engine_name != "Lazy" && engine_name != "PostgreSQL" && engine_define->engine->arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
@ -168,6 +172,42 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context);
}
#if USE_LIBPQXX
else if (engine_name == "PostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
throw Exception(fmt::format(
"{} Database require postgres_host_port, postgres_dbname, "
"postgres_username, mysql_password arguments.", engine_name),
ErrorCodes::BAD_ARGUMENTS);
ASTs & engine_args = engine->arguments->children;
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name);
const auto & postgres_database_name = safeGetLiteralValue<String>(engine_args[1], engine_name);
const auto & username = safeGetLiteralValue<String>(engine_args[2], engine_name);
const auto & password = safeGetLiteralValue<String>(engine_args[3], engine_name);
auto parsed_host_port = parseAddress(host_port, 5432);
String connection_str;
connection_str = fmt::format("dbname={} host={} port={} user={} password={}",
postgres_database_name, parsed_host_port.first, std::to_string(parsed_host_port.second),
username, password);
/// no connection is made here
auto connection = std::make_shared<PGConnection>(connection_str);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection);
}
#endif
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -0,0 +1,306 @@
#include <Databases/PostgreSQL/DatabasePostgreSQL.h>
#if USE_LIBPQXX
#include <string>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Common/escapeForFileName.h>
#include <Common/parseAddress.h>
#include <Common/setThreadName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <common/logger_useful.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <TableFunctions/TableFunctionPostgreSQL.h>
#include <Databases/PostgreSQL/FetchFromPostgreSQL.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int TABLE_IS_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNEXPECTED_AST_STRUCTURE;
}
DatabasePostgreSQL::DatabasePostgreSQL(
const Context & context,
const String & metadata_path_,
const ASTStorage * database_engine_define_,
const String & dbname_,
const String & postgres_dbname,
PGConnectionPtr connection_)
: IDatabase(dbname_)
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, dbname(postgres_dbname)
, connection(std::move(connection_))
{
}
bool DatabasePostgreSQL::empty() const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "empty");
std::lock_guard<std::mutex> lock(mutex);
auto tables_list = fetchTablesList();
for (const auto & table_name : tables_list)
if (!detached_tables.count(table_name))
return false;
return true;
}
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(
const Context & context, const FilterByNameFunction & /* filter_by_table_name */)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "getTablesIterator");
std::lock_guard<std::mutex> lock(mutex);
Tables tables;
auto table_names = fetchTablesList();
for (auto & table_name : table_names)
if (!detached_tables.count(table_name))
tables[table_name] = fetchTable(table_name, context);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
}
std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "fetchTablesList");
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
pqxx::read_transaction tx(*connection->conn());
for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name));
return tables;
}
bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
{
pqxx::nontransaction tx(*connection->conn());
pqxx::result result = tx.exec(fmt::format(
"SELECT attname FROM pg_attribute "
"WHERE attrelid = '{}'::regclass "
"AND NOT attisdropped AND attnum > 0", table_name));
if (result.empty())
return false;
return true;
}
bool DatabasePostgreSQL::isTableExist(const String & table_name, const Context & /* context */) const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "isTableExists");
std::lock_guard<std::mutex> lock(mutex);
if (detached_tables.count(table_name))
return false;
return checkPostgresTable(table_name);
}
StoragePtr DatabasePostgreSQL::tryGetTable(const String & table_name, const Context & context) const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "tryGetTable");
std::lock_guard<std::mutex> lock(mutex);
if (detached_tables.count(table_name))
return StoragePtr{};
else
return fetchTable(table_name, context);
}
StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Context & context) const
{
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchTableStructure(connection->conn(), table_name, use_nulls);
if (!columns)
return StoragePtr{};
return StoragePostgreSQL::create(
StorageID(database_name, table_name), table_name,
connection, ColumnsDescription{*columns}, ConstraintsDescription{}, context);
}
void DatabasePostgreSQL::attachTable(const String & table_name, const StoragePtr & /* storage */, const String &)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "attachTable");
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot attach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (!detached_tables.count(table_name))
throw Exception(fmt::format("Cannot attach table {}.{}. It already exists", database_name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
detached_tables.erase(table_name);
}
StoragePtr DatabasePostgreSQL::detachTable(const String & table_name)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "detachTable");
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot detach table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (detached_tables.count(table_name))
throw Exception(fmt::format("Cannot detach table {}.{}. It is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
detached_tables.emplace(table_name);
return StoragePtr{};
}
void DatabasePostgreSQL::createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "createTable");
const auto & create = create_query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception("PostgreSQL database engine does not support create table", ErrorCodes::NOT_IMPLEMENTED);
attachTable(table_name, storage, {});
}
void DatabasePostgreSQL::dropTable(const Context &, const String & table_name, bool /*no_delay*/)
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "detachPermanently");
std::lock_guard<std::mutex> lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(fmt::format("Cannot drop table {}.{} because it does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
if (detached_tables.count(table_name))
throw Exception(fmt::format("Table {}.{} is already dropped/detached", database_name, table_name), ErrorCodes::TABLE_IS_DROPPED);
detached_tables.emplace(table_name);
}
void DatabasePostgreSQL::drop(const Context & /*context*/)
{
Poco::File(getMetadataPath()).remove(true);
}
ASTPtr DatabasePostgreSQL::getCreateDatabaseQuery() const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "getDatabaseQuery");
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = getDatabaseName();
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const
{
LOG_TRACE(&Poco::Logger::get("kssenii"), "getTableQueryImpl");
auto storage = fetchTable(table_name, context);
if (!storage)
{
if (throw_on_error)
throw Exception(fmt::format("PostgreSQL table {}.{} does not exist", database_name, table_name), ErrorCodes::UNKNOWN_TABLE);
return nullptr;
}
/// Get create table query from storage
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
{
/// init create query.
auto table_id = storage->getStorageID();
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
std::function<ASTPtr(const DataTypePtr &)> convert_datatype_to_query = [&](const DataTypePtr & data_type) -> ASTPtr
{
WhichDataType which(data_type);
if (!which.isNullable())
return std::make_shared<ASTIdentifier>(data_type->getName());
return makeASTFunction("Nullable", convert_datatype_to_query(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
};
column_declaration->type = convert_datatype_to_query(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
auto mysql_table_name = std::make_shared<ASTLiteral>(table_id.table_name);
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
/// Unset settings
storage_children.erase(
std::remove_if(storage_children.begin(), storage_children.end(),
[&](const ASTPtr & element) { return element.get() == ast_storage->settings; }),
storage_children.end());
ast_storage->settings = nullptr;
}
return create_table_query;
}
}
#endif

View File

@ -0,0 +1,86 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Core/MultiEnum.h>
#include <Common/ThreadPool.h>
#include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h>
#include <atomic>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>
#include <Storages/StoragePostgreSQL.h>
#include <pqxx/pqxx>
namespace DB
{
class Context;
class DatabasePostgreSQL final : public IDatabase
{
public:
DatabasePostgreSQL(
const Context & context,
const String & metadata_path_,
const ASTStorage * database_engine_define,
const String & dbname_,
const String & postgres_dbname,
PGConnectionPtr connection_);
String getEngineName() const override { return "PostgreSQL"; }
String getMetadataPath() const override { return metadata_path; };
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
ASTPtr getCreateDatabaseQuery() const override;
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
bool isTableExist(const String & name, const Context & context) const override;
StoragePtr tryGetTable(const String & name, const Context & context) const override;
void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(const Context &, const String & table_name, bool no_delay) override;
void attachTable(const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
void drop(const Context & /*context*/) override;
void shutdown() override {};
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const override;
private:
const Context & global_context;
String metadata_path;
ASTPtr database_engine_define;
String dbname;
PGConnectionPtr connection;
std::unordered_set<std::string> detached_tables;
bool checkPostgresTable(const String & table_name) const;
std::unordered_set<std::string> fetchTablesList() const;
StoragePtr fetchTable(const String & table_name, const Context & context) const;
};
}
#endif

View File

@ -0,0 +1,120 @@
#include <Databases/PostgreSQL/FetchFromPostgreSQL.h>
#if USE_LIBPQXX
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <pqxx/pqxx>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
}
/// These functions are also used for postgresql table function
std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();
std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims "
"FROM pg_attribute "
"WHERE attrelid = '{}'::regclass "
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
pqxx::read_transaction tx(*connection);
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
/// No rows to be fetched
if (!stream)
return nullptr;
while (stream >> row)
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
getDataType(std::get<1>(row), use_nulls && (std::get<2>(row) == "f"), std::get<3>(row))));
}
stream.complete();
tx.commit();
return std::make_shared<NamesAndTypesList>(columns);
}
DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimensions)
{
DataTypePtr res;
/// Get rid of trailing '[]' for arrays
if (dimensions)
type.resize(type.size() - 2);
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type == "integer")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
res = std::make_shared<DataTypeInt64>();
else if (type == "real")
res = std::make_shared<DataTypeFloat32>();
else if (type == "double precision")
res = std::make_shared<DataTypeFloat64>();
else if (type == "serial")
res = std::make_shared<DataTypeUInt32>();
else if (type == "bigserial")
res = std::make_shared<DataTypeUInt64>();
else if (type.starts_with("timestamp"))
res = std::make_shared<DataTypeDateTime>();
else if (type == "date")
res = std::make_shared<DataTypeDate>();
else if (type.starts_with("numeric"))
{
/// Numeric and decimal will both end up here as numeric
/// Will get numeric(precision, scale) string, need to extract precision and scale
std::vector<std::string> result;
boost::split(result, type, [](char c){ return c == '(' || c == ',' || c == ')'; });
for (std::string & key : result)
boost::trim(key);
/// If precision or scale are not specified, postgres creates a column in which numeric values of
/// any precision and scale can be stored, so may be maxPrecision may be used instead of exception
if (result.size() < 3)
throw Exception("Numeric lacks precision and scale in its definition", ErrorCodes::UNKNOWN_TYPE);
uint32_t precision = std::atoi(result[1].data());
uint32_t scale = std::atoi(result[2].data());
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
res = std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
res = std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
res = std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
}
if (!res)
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
return res;
}
}
#endif

View File

@ -0,0 +1,18 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Storages/StoragePostgreSQL.h>
namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchTableStructure(ConnectionPtr connection, const String & postgres_table_name, bool use_nulls);
DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimensions);
}
#endif

View File

@ -19,11 +19,11 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
const std::string & connection_str,
PGConnectionPtr connection_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, sample_block(sample_block_)
, connection(std::make_shared<pqxx::connection>(connection_str))
, connection(std::move(connection_))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), ""))
@ -40,7 +40,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other)
: dict_struct(other.dict_struct)
, sample_block(other.sample_block)
, connection(other.connection)
, connection(std::make_shared<PGConnection>(other.connection->conn_str()))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(other.db)
, table(other.table)
@ -59,7 +59,7 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<PostgreSQLBlockInputStream>(
connection, load_all_query, sample_block, max_block_size);
connection->conn(), load_all_query, sample_block, max_block_size);
}
@ -67,20 +67,20 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
{
auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<PostgreSQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), load_update_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size);
return std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size);
}
@ -102,7 +102,7 @@ std::string PostgreSQLDictionarySource::doInvalidateQuery(const std::string & re
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
PostgreSQLBlockInputStream block_input_stream(connection, request, invalidate_sample_block, 1);
PostgreSQLBlockInputStream block_input_stream(connection->conn(), request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
}
@ -167,9 +167,10 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
config.getUInt(fmt::format("{}.port", config_prefix), 0),
config.getString(fmt::format("{}.user", config_prefix), ""),
config.getString(fmt::format("{}.password", config_prefix), ""));
auto connection = std::make_shared<PGConnection>(connection_str);
return std::make_unique<PostgreSQLDictionarySource>(
dict_struct, config, config_prefix, connection_str, sample_block);
dict_struct, config, config_prefix, connection, sample_block);
#else
(void)dict_struct;
(void)config;

View File

@ -11,11 +11,11 @@
#include <Core/Block.h>
#include <common/LocalDateTime.h>
#include <common/logger_useful.h>
#include <Storages/StoragePostgreSQL.h>
#include <pqxx/pqxx>
namespace DB
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
/// Allows loading dictionaries from a PostgreSQL database
class PostgreSQLDictionarySource final : public IDictionarySource
@ -25,7 +25,7 @@ public:
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
const std::string & connection_str,
PGConnectionPtr connection_,
const Block & sample_block_);
/// copy-constructor is provided in order to support cloneability
@ -50,7 +50,7 @@ private:
const DictionaryStructure dict_struct;
Block sample_block;
ConnectionPtr connection;
PGConnectionPtr connection;
Poco::Logger * log;
const std::string db;

View File

@ -31,8 +31,7 @@ namespace ErrorCodes
StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_,
const String & remote_table_name_,
ConnectionPtr connection_,
const String connection_str_,
PGConnectionPtr connection_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
@ -40,7 +39,6 @@ StoragePostgreSQL::StoragePostgreSQL(
, remote_table_name(remote_table_name_)
, global_context(context_)
, connection(std::move(connection_))
, connection_str(connection_str_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -74,22 +72,20 @@ Pipe StoragePostgreSQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
checkConnection(connection);
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size_)));
std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size_)));
}
BlockOutputStreamPtr StoragePostgreSQL::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */)
{
return std::make_shared<PostgreSQLBlockOutputStream>(*this, metadata_snapshot, connection, remote_table_name);
return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection->conn(), remote_table_name);
}
void PostgreSQLBlockOutputStream::writePrefix()
{
storage.checkConnection(connection);
work = std::make_unique<pqxx::work>(*connection);
}
@ -144,16 +140,6 @@ void PostgreSQLBlockOutputStream::writeSuffix()
}
void StoragePostgreSQL::checkConnection(ConnectionPtr & pg_connection) const
{
if (!pg_connection->is_open())
{
pg_connection->close();
pg_connection = std::make_shared<pqxx::connection>(connection_str);
}
}
void registerStoragePostgreSQL(StorageFactory & factory)
{
factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args)
@ -178,9 +164,9 @@ void registerStoragePostgreSQL(StorageFactory & factory)
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
auto connection = std::make_shared<pqxx::connection>(connection_str);
auto connection = std::make_shared<PGConnection>(connection_str);
return StoragePostgreSQL::create(
args.table_id, remote_table, connection, connection_str, args.columns, args.constraints, args.context);
args.table_id, remote_table, connection, args.columns, args.constraints, args.context);
},
{
.source_access_type = AccessType::POSTGRES,

View File

@ -14,6 +14,8 @@
namespace DB
{
class PGConnection;
using PGConnectionPtr = std::shared_ptr<PGConnection>;
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>, public IStorage
@ -23,8 +25,7 @@ public:
StoragePostgreSQL(
const StorageID & table_id_,
const std::string & remote_table_name_,
ConnectionPtr connection_,
const String connection_str,
PGConnectionPtr connection_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
@ -44,13 +45,10 @@ public:
private:
friend class PostgreSQLBlockOutputStream;
void checkConnection(ConnectionPtr & connection) const;
String remote_table_name;
Context global_context;
ConnectionPtr connection;
const String connection_str;
PGConnectionPtr connection;
};
@ -58,12 +56,10 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
{
public:
explicit PostgreSQLBlockOutputStream(
const StoragePostgreSQL & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ConnectionPtr connection_,
const std::string & remote_table_name_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
: metadata_snapshot(metadata_snapshot_)
, connection(connection_)
, remote_table_name(remote_table_name_)
{
@ -76,7 +72,6 @@ public:
void writeSuffix() override;
private:
const StoragePostgreSQL & storage;
StorageMetadataPtr metadata_snapshot;
ConnectionPtr connection;
std::string remote_table_name;
@ -84,6 +79,35 @@ private:
std::unique_ptr<pqxx::work> work;
std::unique_ptr<pqxx::stream_to> stream_inserter;
};
/// Tiny connection class to make it more convenient to use.
class PGConnection
{
public:
PGConnection(std::string & connection_str_) : connection_str(connection_str_) {}
PGConnection(const PGConnection &) = delete;
PGConnection operator =(const PGConnection &) = delete;
ConnectionPtr conn()
{
checkUpdateConnection();
return connection;
}
std::string & conn_str() { return connection_str; }
private:
ConnectionPtr connection;
std::string connection_str;
void checkUpdateConnection()
{
if (!connection || !connection->is_open())
connection = std::make_unique<pqxx::connection>(connection_str);
}
};
}
#endif

View File

@ -1,13 +1,6 @@
#include <TableFunctions/TableFunctionPostgreSQL.h>
#if USE_LIBPQXX
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -16,7 +9,7 @@
#include <Common/Exception.h>
#include <Common/parseAddress.h>
#include "registerTableFunctions.h"
#include <Storages/StoragePostgreSQL.h>
#include <Databases/PostgreSQL/FetchFromPostgreSQL.h>
namespace DB
{
@ -35,7 +28,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name), remote_table_name,
connection, connection_str, columns, ConstraintsDescription{}, context);
connection, columns, ConstraintsDescription{}, context);
result->startup();
return result;
@ -45,90 +38,9 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(const Context & context) const
{
const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = NamesAndTypesList();
auto columns = fetchTableStructure(connection->conn(), remote_table_name, use_nulls);
std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims "
"FROM pg_attribute "
"WHERE attrelid = '{}'::regclass "
"AND NOT attisdropped AND attnum > 0", remote_table_name);
pqxx::read_transaction tx(*connection);
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
while (stream >> row)
{
columns.push_back(NameAndTypePair(
std::get<0>(row),
getDataType(std::get<1>(row), use_nulls && (std::get<2>(row) == "f"), std::get<3>(row))));
}
stream.complete();
tx.commit();
return ColumnsDescription{columns};
}
DataTypePtr TableFunctionPostgreSQL::getDataType(std::string & type, bool is_nullable, uint16_t dimensions) const
{
DataTypePtr res;
/// Get rid of trailing '[]' for arrays
if (dimensions)
type.resize(type.size() - 2);
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
else if (type == "integer")
res = std::make_shared<DataTypeInt32>();
else if (type == "bigint")
res = std::make_shared<DataTypeInt64>();
else if (type == "real")
res = std::make_shared<DataTypeFloat32>();
else if (type == "double precision")
res = std::make_shared<DataTypeFloat64>();
else if (type == "serial")
res = std::make_shared<DataTypeUInt32>();
else if (type == "bigserial")
res = std::make_shared<DataTypeUInt64>();
else if (type.starts_with("timestamp"))
res = std::make_shared<DataTypeDateTime>();
else if (type == "date")
res = std::make_shared<DataTypeDate>();
else if (type.starts_with("numeric"))
{
/// Numeric and decimal will both end up here as numeric
/// Will get numeric(precision, scale) string, need to extract precision and scale
std::vector<std::string> result;
boost::split(result, type, [](char c){ return c == '(' || c == ',' || c == ')'; });
for (std::string & key : result)
boost::trim(key);
/// If precision or scale are not specified, postgres creates a column in which numeric values of
/// any precision and scale can be stored, so may be maxPrecision may be used instead of exception
if (result.size() < 3)
throw Exception("Numeric lacks precision and scale in its definition", ErrorCodes::UNKNOWN_TYPE);
uint32_t precision = std::atoi(result[1].data());
uint32_t scale = std::atoi(result[2].data());
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
res = std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
res = std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
res = std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
}
if (!res)
res = std::make_shared<DataTypeString>();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
while (dimensions--)
res = std::make_shared<DataTypeArray>(res);
return res;
return ColumnsDescription{*columns};
}
@ -157,7 +69,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
parsed_host_port.first, std::to_string(parsed_host_port.second),
args[3]->as<ASTLiteral &>().value.safeGet<String>(),
args[4]->as<ASTLiteral &>().value.safeGet<String>());
connection = std::make_shared<pqxx::connection>(connection_str);
connection = std::make_shared<PGConnection>(connection_str);
}

View File

@ -5,11 +5,11 @@
#if USE_LIBPQXX
#include <TableFunctions/ITableFunction.h>
#include <Storages/StoragePostgreSQL.h>
#include "pqxx/pqxx"
namespace DB
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
class TableFunctionPostgreSQL : public ITableFunction
{
@ -27,11 +27,9 @@ private:
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
DataTypePtr getDataType(std::string & type, bool is_nullable, uint16_t dimensions) const;
String connection_str;
String remote_table_name;
ConnectionPtr connection;
PGConnectionPtr connection;
};
}

View File

@ -32,9 +32,6 @@
<null_value></null_value>
</attribute>
</structure>
<lifetime>
<min>1</min>
<max>1</max>
</lifetime>
<lifetime>1</lifetime>
</dictionary>
</yandex>

View File

@ -1,8 +1,8 @@
import pytest
import time
import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)

View File

@ -0,0 +1,122 @@
import pytest
import time
import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[], with_postgres=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, value Integer, PRIMARY KEY (id))
"""
def get_postgres_conn(database=False):
if database == True:
conn_string = "host='localhost' dbname='test_database' user='postgres' password='mysecretpassword'"
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(cursor, table_name):
# database was specified in connection string
cursor.execute(postgres_table_template.format(table_name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn()
cursor = conn.cursor()
create_postgres_db(cursor, 'test_database')
yield cluster
finally:
cluster.shutdown()
def test_postgres_database_engine_with_postgres_ddl(started_cluster):
# connect to database as well
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
assert 'test_database' in node1.query('SHOW DATABASES')
create_postgres_table(cursor, 'test_table')
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
cursor.execute('ALTER TABLE test_table ADD COLUMN data Text')
assert 'data' in node1.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
node1.query("INSERT INTO test_database.test_table SELECT 101, 101, toString(101)")
assert node1.query("SELECT data FROM test_database.test_table WHERE id = 101").rstrip() == '101'
cursor.execute('ALTER TABLE test_table DROP COLUMN data')
assert 'data' not in node1.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
cursor.execute('DROP TABLE test_table;')
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_engine_with_clickhouse_ddl(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
create_postgres_table(cursor, 'test_table')
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in node1.query('SHOW TABLES FROM test_database')
node1.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node1.query('SHOW TABLES FROM test_database')
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
def test_postgresql_database_engine_queries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
node1.query(
"CREATE DATABASE test_database ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword')")
create_postgres_table(cursor, 'test_table')
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == '0'
node1.query("INSERT INTO test_database.test_table SELECT number, number from numbers(10000)")
assert node1.query("SELECT count() FROM test_database.test_table").rstrip() == '10000'
node1.query("DROP DATABASE test_database")
assert 'test_database' not in node1.query('SHOW DATABASES')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()