From 2f6cb7f2f5207d4e8feaf39cd489467857b5b55a Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 21 Nov 2020 01:47:04 +0300 Subject: [PATCH] Add storage PostgreSQL with read support --- src/Access/AccessType.h | 1 + src/Core/ExternalResultDescription.cpp | 3 + src/Core/ExternalResultDescription.h | 3 +- src/Core/config_core.h.in | 2 +- src/Formats/MySQLBlockInputStream.cpp | 3 + src/Formats/PostgreSQLBlockInputStream.cpp | 262 +++++++++++++++++++++ src/Formats/PostgreSQLBlockInputStream.h | 44 ++++ src/Storages/StoragePostgreSQL.cpp | 137 +++++++++++ src/Storages/StoragePostgreSQL.h | 50 ++++ src/Storages/registerStorages.cpp | 7 + 10 files changed, 510 insertions(+), 2 deletions(-) create mode 100644 src/Formats/PostgreSQLBlockInputStream.cpp create mode 100644 src/Formats/PostgreSQLBlockInputStream.h create mode 100644 src/Storages/StoragePostgreSQL.cpp create mode 100644 src/Storages/StoragePostgreSQL.h diff --git a/src/Access/AccessType.h b/src/Access/AccessType.h index 1a070420fd1..5a84aa66739 100644 --- a/src/Access/AccessType.h +++ b/src/Access/AccessType.h @@ -159,6 +159,7 @@ enum class AccessType M(REMOTE, "", GLOBAL, SOURCES) \ M(MONGO, "", GLOBAL, SOURCES) \ M(MYSQL, "", GLOBAL, SOURCES) \ + M(POSTGRES, "", GLOBAL, SOURCES) \ M(ODBC, "", GLOBAL, SOURCES) \ M(JDBC, "", GLOBAL, SOURCES) \ M(HDFS, "", GLOBAL, SOURCES) \ diff --git a/src/Core/ExternalResultDescription.cpp b/src/Core/ExternalResultDescription.cpp index 7165d73b7d0..792e1c30eae 100644 --- a/src/Core/ExternalResultDescription.cpp +++ b/src/Core/ExternalResultDescription.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -76,6 +77,8 @@ void ExternalResultDescription::init(const Block & sample_block_) types.emplace_back(ValueType::vtDecimal128, is_nullable); else if (typeid_cast *>(type)) types.emplace_back(ValueType::vtDecimal256, is_nullable); + else if (typeid_cast(type)) + types.emplace_back(ValueType::vtArray, is_nullable); else throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE}; } diff --git a/src/Core/ExternalResultDescription.h b/src/Core/ExternalResultDescription.h index f8ba2a6bba2..31114746069 100644 --- a/src/Core/ExternalResultDescription.h +++ b/src/Core/ExternalResultDescription.h @@ -30,7 +30,8 @@ struct ExternalResultDescription vtDecimal32, vtDecimal64, vtDecimal128, - vtDecimal256 + vtDecimal256, + vtArray }; Block sample_block; diff --git a/src/Core/config_core.h.in b/src/Core/config_core.h.in index 5dbac2f69bf..6c7a35abd7c 100644 --- a/src/Core/config_core.h.in +++ b/src/Core/config_core.h.in @@ -12,4 +12,4 @@ #cmakedefine01 USE_OPENCL #cmakedefine01 USE_LDAP #cmakedefine01 USE_ROCKSDB - +#cmakedefine01 USE_LIBPQXX diff --git a/src/Formats/MySQLBlockInputStream.cpp b/src/Formats/MySQLBlockInputStream.cpp index 2ff8e8e5fb2..f44c44da60d 100644 --- a/src/Formats/MySQLBlockInputStream.cpp +++ b/src/Formats/MySQLBlockInputStream.cpp @@ -22,6 +22,7 @@ namespace DB namespace ErrorCodes { extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; + extern const int NOT_IMPLEMENTED; } MySQLBlockInputStream::Connection::Connection( @@ -110,6 +111,8 @@ namespace data_type.deserializeAsWholeText(column, buffer, FormatSettings{}); break; } + default: + throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED); } } diff --git a/src/Formats/PostgreSQLBlockInputStream.cpp b/src/Formats/PostgreSQLBlockInputStream.cpp new file mode 100644 index 00000000000..33d9598eefc --- /dev/null +++ b/src/Formats/PostgreSQLBlockInputStream.cpp @@ -0,0 +1,262 @@ +#if !defined(ARCADIA_BUILD) +#include "config_core.h" +#endif + +#if USE_LIBPQXX +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include "PostgreSQLBlockInputStream.h" +#include +#include + +namespace DB +{ + +PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( + std::shared_ptr connection_, + const std::string & query_str_, + const Block & sample_block, + const UInt64 max_block_size_) + : query_str(query_str_) + , max_block_size(max_block_size_) + , connection(connection_) + , work(std::make_unique(*connection)) + , stream(std::make_unique(*work, pqxx::from_query, std::string_view(query_str))) +{ + description.init(sample_block); +} + + +Block PostgreSQLBlockInputStream::readImpl() +{ + /// Check if pqxx::stream_from is finished + if (!stream || !(*stream)) + return Block(); + + MutableColumns columns = description.sample_block.cloneEmptyColumns(); + size_t num_rows = 0; + + while (true) + { + const std::vector * row{stream->read_row()}; + + if (!row) + { + stream->complete(); + break; + } + + if (row->empty()) + break; + + std::string value; + for (const auto idx : ext::range(0, row->size())) + { + value = std::string((*row)[idx]); + LOG_DEBUG((&Poco::Logger::get("PostgreSQL")), "GOT {}", value); + const auto & sample = description.sample_block.getByPosition(idx); + + if (value.data()) + { + if (description.types[idx].second) + { + ColumnNullable & column_nullable = assert_cast(*columns[idx]); + const auto & data_type = assert_cast(*sample.type); + insertValue(column_nullable.getNestedColumn(), value, description.types[idx].first, data_type.getNestedType()); + column_nullable.getNullMapData().emplace_back(0); + } + else + { + insertValue(*columns[idx], value, description.types[idx].first, sample.type); + } + } + else + { + insertDefaultValue(*columns[idx], *sample.column); + } + + } + + if (++num_rows == max_block_size) + break; + } + + return description.sample_block.cloneWithColumns(std::move(columns)); +} + + +void PostgreSQLBlockInputStream::insertValue(IColumn & column, const std::string & value, + const ExternalResultDescription::ValueType type, const DataTypePtr data_type) +{ + switch (type) + { + case ValueType::vtUInt8: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtUInt16: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtUInt32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtUInt64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtInt8: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtInt16: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtInt32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtInt64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtFloat32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtFloat64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ValueType::vtString: + assert_cast(column).insertData(value.data(), value.size()); + break; + case ValueType::vtDate: + //assert_cast(column).insertValue(UInt16(value.getDate().getDayNum())); + break; + case ValueType::vtDateTime: + //assert_cast(column).insertValue(UInt32(value.getDateTime())); + break; + case ValueType::vtUUID: + assert_cast(column).insert(parse(value.data(), value.size())); + break; + case ValueType::vtDateTime64:[[fallthrough]]; + case ValueType::vtDecimal32: [[fallthrough]]; + case ValueType::vtDecimal64: [[fallthrough]]; + case ValueType::vtDecimal128:[[fallthrough]]; + case ValueType::vtDecimal256: + { + ReadBuffer buffer(const_cast(value.data()), value.size(), 0); + data_type->deserializeAsWholeText(column, buffer, FormatSettings{}); + break; + } + case ValueType::vtArray: + { + const auto * array_type = typeid_cast(data_type.get()); + auto nested = array_type->getNestedType(); + + size_t expected_dimensions = 1; + while (isArray(nested)) + { + ++expected_dimensions; + nested = typeid_cast(nested.get())->getNestedType(); + } + auto which = WhichDataType(nested); + + auto get_array([&]() -> Field + { + pqxx::array_parser parser{value}; + std::pair parsed = parser.get_next(); + + std::vector> dimensions(expected_dimensions + 1); + size_t dimension = 0, max_dimension = 0; + bool new_row = false, null_value = false; + + while (parsed.first != pqxx::array_parser::juncture::done) + { + while (parsed.first == pqxx::array_parser::juncture::row_start) + { + ++dimension; + if (dimension > expected_dimensions) + throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS); + + parsed = parser.get_next(); + new_row = true; + } + + /// TODO: dont forget to add test with null type + std::vector current_dimension_row; + while (parsed.first != pqxx::array_parser::juncture::row_end) + { + if (parsed.first == pqxx::array_parser::juncture::null_value) + null_value = true; + + if (which.isUInt8() || which.isUInt16()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : UInt16()); + else if (which.isUInt32()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : UInt32()); + else if (which.isUInt64()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : UInt64()); + else if (which.isInt8() || which.isInt16()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : Int16()); + else if (which.isInt32()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : Int32()); + else if (which.isInt64()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : Int64()); + //else if (which.isDate()) + //else if (which.isDateTime()) + else if (which.isFloat32()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : Float32()); + else if (which.isFloat64()) + current_dimension_row.emplace_back(!null_value ? pqxx::from_string(parsed.second) : Float64()); + else if (which.isString() || which.isFixedString()) + current_dimension_row.emplace_back(!null_value ? parsed.second : String()); + else throw Exception("Unexpected type " + nested->getName(), ErrorCodes::BAD_ARGUMENTS); + + parsed = parser.get_next(); + null_value = false; + } + + while (parsed.first == pqxx::array_parser::juncture::row_end) + { + --dimension; + if (std::exchange(new_row, false)) + { + if (dimension + 1 > max_dimension) + max_dimension = dimension + 1; + if (dimension) + dimensions[dimension].emplace_back(Array(current_dimension_row.begin(), current_dimension_row.end())); + else + return Array(current_dimension_row.begin(), current_dimension_row.end()); + } + else if (dimension) + { + dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end())); + dimensions[dimension + 1].clear(); + } + parsed = parser.get_next(); + } + } + + if (max_dimension < expected_dimensions) + throw Exception("Got less dimensions than expected", ErrorCodes::BAD_ARGUMENTS); + + return Array(dimensions[1].begin(), dimensions[1].end()); + }); + + assert_cast(column).insert(get_array()); + break; + } + } +} + +} + +#endif diff --git a/src/Formats/PostgreSQLBlockInputStream.h b/src/Formats/PostgreSQLBlockInputStream.h new file mode 100644 index 00000000000..e8a84999d3a --- /dev/null +++ b/src/Formats/PostgreSQLBlockInputStream.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include + +#include + +namespace DB +{ + +class PostgreSQLBlockInputStream : public IBlockInputStream +{ +public: + PostgreSQLBlockInputStream( + std::shared_ptr connection_, + const std::string & query_str, + const Block & sample_block, + const UInt64 max_block_size_); + + String getName() const override { return "PostgreSQL"; } + Block getHeader() const override { return description.sample_block.cloneEmpty(); } + +private: + using ValueType = ExternalResultDescription::ValueType; + + Block readImpl() override; + void insertValue(IColumn & column, const std::string & value, + const ExternalResultDescription::ValueType type, const DataTypePtr data_type); + void insertDefaultValue(IColumn & column, const IColumn & sample_column) + { + column.insertFrom(sample_column, 0); + } + + const String query_str; + const UInt64 max_block_size; + ExternalResultDescription description; + + std::shared_ptr connection; + std::unique_ptr work; + std::unique_ptr stream; +}; + +} diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp new file mode 100644 index 00000000000..4351b4c115b --- /dev/null +++ b/src/Storages/StoragePostgreSQL.cpp @@ -0,0 +1,137 @@ +#include "StoragePostgreSQL.h" + +#if USE_LIBPQXX + +#include +#include + +#include +#include + +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +StoragePostgreSQL::StoragePostgreSQL( + const StorageID & table_id_, + const String & remote_database_name_, + const String & remote_table_name_, + const String connection_str, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const Context & context_) + : IStorage(table_id_) + , remote_database_name(remote_database_name_) + , remote_table_name(remote_table_name_) + , global_context(context_) + , connection(std::make_shared(connection_str)) +{ + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + setInMemoryMetadata(storage_metadata); +} + + +Pipe StoragePostgreSQL::read( + const Names & column_names_, + const StorageMetadataPtr & metadata_snapshot, + SelectQueryInfo & query_info_, + const Context & context_, + QueryProcessingStage::Enum /*processed_stage*/, + size_t max_block_size_, + unsigned) +{ + metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); + + String query = transformQueryForExternalDatabase( + query_info_, + metadata_snapshot->getColumns().getOrdinary(), + IdentifierQuotingStyle::DoubleQuotes, + remote_database_name, + remote_table_name, + context_); + + Block sample_block; + for (const String & column_name : column_names_) + { + auto column_data = metadata_snapshot->getColumns().getPhysical(column_name); + WhichDataType which(column_data.type); + + if (which.isEnum()) + column_data.type = std::make_shared(); + + sample_block.insert({ column_data.type, column_data.name }); + } + + return Pipe(std::make_shared( + std::make_shared(connection, query, sample_block, max_block_size_))); +} + + +void registerStoragePostgreSQL(StorageFactory & factory) +{ + factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args) + { + ASTs & engine_args = args.engine_args; + + if (engine_args.size() < 5) + throw Exception( + "Storage PostgreSQL requires 5-7 parameters: PostgreSQL('host:port', database, table, 'username', 'password'.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context); + + auto parsed_host_port = parseAddress(engine_args[0]->as().value.safeGet(), 5432); + const String & remote_database = engine_args[1]->as().value.safeGet(); + const String & remote_table = engine_args[2]->as().value.safeGet(); + + String connection_str; + if (remote_database.empty()) + { + connection_str = fmt::format("host={} port={} user={} password={}", + parsed_host_port.first, std::to_string(parsed_host_port.second), + engine_args[3]->as().value.safeGet(), + engine_args[4]->as().value.safeGet()); + } + else + { + connection_str = fmt::format("dbname={} host={} port={} user={} password={}", + remote_database, parsed_host_port.first, std::to_string(parsed_host_port.second), + engine_args[3]->as().value.safeGet(), + engine_args[4]->as().value.safeGet()); + } + + return StoragePostgreSQL::create( + args.table_id, remote_database, remote_table, connection_str, args.columns, args.constraints, args.context); + }, + { + .source_access_type = AccessType::POSTGRES, + }); +} + +} + +#endif diff --git a/src/Storages/StoragePostgreSQL.h b/src/Storages/StoragePostgreSQL.h new file mode 100644 index 00000000000..9108d9d341f --- /dev/null +++ b/src/Storages/StoragePostgreSQL.h @@ -0,0 +1,50 @@ +#pragma once + +#if !defined(ARCADIA_BUILD) +#include "config_core.h" +#endif + +#if USE_LIBPQXX + +#include +#include +#include + +#include "pqxx/pqxx" + +namespace DB +{ +class StoragePostgreSQL final : public ext::shared_ptr_helper, public IStorage +{ + friend struct ext::shared_ptr_helper; +public: + StoragePostgreSQL( + const StorageID & table_id_, + const std::string & remote_database_name_, + const std::string & remote_table_name_, + const String connection_str, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const Context & context_); + + String getName() const override { return "PostgreSQL"; } + + Pipe read( + const Names & column_names, + const StorageMetadataPtr & /*metadata_snapshot*/, + SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + +private: + String remote_database_name; + String remote_table_name; + Context global_context; + + std::shared_ptr connection; +}; +} + +#endif diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 33c1b6245ac..0022ee6bd4f 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -58,6 +58,9 @@ void registerStorageRabbitMQ(StorageFactory & factory); void registerStorageEmbeddedRocksDB(StorageFactory & factory); #endif +#if USE_LIBPQXX +void registerStoragePostgreSQL(StorageFactory & factory); +#endif void registerStorages() { @@ -111,6 +114,10 @@ void registerStorages() #if USE_ROCKSDB registerStorageEmbeddedRocksDB(factory); #endif + + #if USE_LIBPQXX + registerStoragePostgreSQL(factory); + #endif } }