Add storage PostgreSQL with read support

This commit is contained in:
kssenii 2020-11-21 01:47:04 +03:00
parent 2d8e35b4bd
commit 2f6cb7f2f5
10 changed files with 510 additions and 2 deletions

View File

@ -159,6 +159,7 @@ enum class AccessType
M(REMOTE, "", GLOBAL, SOURCES) \
M(MONGO, "", GLOBAL, SOURCES) \
M(MYSQL, "", GLOBAL, SOURCES) \
M(POSTGRES, "", GLOBAL, SOURCES) \
M(ODBC, "", GLOBAL, SOURCES) \
M(JDBC, "", GLOBAL, SOURCES) \
M(HDFS, "", GLOBAL, SOURCES) \

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
@ -76,6 +77,8 @@ void ExternalResultDescription::init(const Block & sample_block_)
types.emplace_back(ValueType::vtDecimal128, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal256> *>(type))
types.emplace_back(ValueType::vtDecimal256, is_nullable);
else if (typeid_cast<const DataTypeArray *>(type))
types.emplace_back(ValueType::vtArray, is_nullable);
else
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
}

View File

@ -30,7 +30,8 @@ struct ExternalResultDescription
vtDecimal32,
vtDecimal64,
vtDecimal128,
vtDecimal256
vtDecimal256,
vtArray
};
Block sample_block;

View File

@ -12,4 +12,4 @@
#cmakedefine01 USE_OPENCL
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ROCKSDB
#cmakedefine01 USE_LIBPQXX

View File

@ -22,6 +22,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
}
MySQLBlockInputStream::Connection::Connection(
@ -110,6 +111,8 @@ namespace
data_type.deserializeAsWholeText(column, buffer, FormatSettings{});
break;
}
default:
throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -0,0 +1,262 @@
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <vector>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/assert_cast.h>
#include <ext/range.h>
#include "PostgreSQLBlockInputStream.h"
#include <common/logger_useful.h>
#include <Core/Field.h>
namespace DB
{
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
std::shared_ptr<pqxx::connection> connection_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, connection(connection_)
, work(std::make_unique<pqxx::work>(*connection))
, stream(std::make_unique<pqxx::stream_from>(*work, pqxx::from_query, std::string_view(query_str)))
{
description.init(sample_block);
}
Block PostgreSQLBlockInputStream::readImpl()
{
/// Check if pqxx::stream_from is finished
if (!stream || !(*stream))
return Block();
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
while (true)
{
const std::vector<pqxx::zview> * row{stream->read_row()};
if (!row)
{
stream->complete();
break;
}
if (row->empty())
break;
std::string value;
for (const auto idx : ext::range(0, row->size()))
{
value = std::string((*row)[idx]);
LOG_DEBUG((&Poco::Logger::get("PostgreSQL")), "GOT {}", value);
const auto & sample = description.sample_block.getByPosition(idx);
if (value.data())
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(column_nullable.getNestedColumn(), value, description.types[idx].first, data_type.getNestedType());
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], value, description.types[idx].first, sample.type);
}
}
else
{
insertDefaultValue(*columns[idx], *sample.column);
}
}
if (++num_rows == max_block_size)
break;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
void PostgreSQLBlockInputStream::insertValue(IColumn & column, const std::string & value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(pqxx::from_string<uint32_t>(value));
break;
case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(pqxx::from_string<uint64_t>(value));
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(pqxx::from_string<int32_t>(value));
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(pqxx::from_string<int64_t>(value));
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(pqxx::from_string<float>(value));
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));
break;
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;
case ValueType::vtDate:
//assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));
break;
case ValueType::vtDateTime:
//assert_cast<ColumnUInt32 &>(column).insertValue(UInt32(value.getDateTime()));
break;
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:[[fallthrough]];
case ValueType::vtDecimal256:
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type->deserializeAsWholeText(column, buffer, FormatSettings{});
break;
}
case ValueType::vtArray:
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
auto nested = array_type->getNestedType();
size_t expected_dimensions = 1;
while (isArray(nested))
{
++expected_dimensions;
nested = typeid_cast<const DataTypeArray *>(nested.get())->getNestedType();
}
auto which = WhichDataType(nested);
auto get_array([&]() -> Field
{
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();
std::vector<std::vector<Field>> dimensions(expected_dimensions + 1);
size_t dimension = 0, max_dimension = 0;
bool new_row = false, null_value = false;
while (parsed.first != pqxx::array_parser::juncture::done)
{
while (parsed.first == pqxx::array_parser::juncture::row_start)
{
++dimension;
if (dimension > expected_dimensions)
throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS);
parsed = parser.get_next();
new_row = true;
}
/// TODO: dont forget to add test with null type
std::vector<Field> current_dimension_row;
while (parsed.first != pqxx::array_parser::juncture::row_end)
{
if (parsed.first == pqxx::array_parser::juncture::null_value)
null_value = true;
if (which.isUInt8() || which.isUInt16())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<uint16_t>(parsed.second) : UInt16());
else if (which.isUInt32())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<uint32_t>(parsed.second) : UInt32());
else if (which.isUInt64())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<uint64_t>(parsed.second) : UInt64());
else if (which.isInt8() || which.isInt16())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<int16_t>(parsed.second) : Int16());
else if (which.isInt32())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<int32_t>(parsed.second) : Int32());
else if (which.isInt64())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<int64_t>(parsed.second) : Int64());
//else if (which.isDate())
//else if (which.isDateTime())
else if (which.isFloat32())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<float>(parsed.second) : Float32());
else if (which.isFloat64())
current_dimension_row.emplace_back(!null_value ? pqxx::from_string<double>(parsed.second) : Float64());
else if (which.isString() || which.isFixedString())
current_dimension_row.emplace_back(!null_value ? parsed.second : String());
else throw Exception("Unexpected type " + nested->getName(), ErrorCodes::BAD_ARGUMENTS);
parsed = parser.get_next();
null_value = false;
}
while (parsed.first == pqxx::array_parser::juncture::row_end)
{
--dimension;
if (std::exchange(new_row, false))
{
if (dimension + 1 > max_dimension)
max_dimension = dimension + 1;
if (dimension)
dimensions[dimension].emplace_back(Array(current_dimension_row.begin(), current_dimension_row.end()));
else
return Array(current_dimension_row.begin(), current_dimension_row.end());
}
else if (dimension)
{
dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end()));
dimensions[dimension + 1].clear();
}
parsed = parser.get_next();
}
}
if (max_dimension < expected_dimensions)
throw Exception("Got less dimensions than expected", ErrorCodes::BAD_ARGUMENTS);
return Array(dimensions[1].begin(), dimensions[1].end());
});
assert_cast<ColumnArray &>(column).insert(get_array());
break;
}
}
}
}
#endif

View File

@ -0,0 +1,44 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
#include <pqxx/pqxx>
namespace DB
{
class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
std::shared_ptr<pqxx::connection> connection_,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_);
String getName() const override { return "PostgreSQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
using ValueType = ExternalResultDescription::ValueType;
Block readImpl() override;
void insertValue(IColumn & column, const std::string & value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type);
void insertDefaultValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
const String query_str;
const UInt64 max_block_size;
ExternalResultDescription description;
std::shared_ptr<pqxx::connection> connection;
std::unique_ptr<pqxx::work> work;
std::unique_ptr<pqxx::stream_from> stream;
};
}

View File

@ -0,0 +1,137 @@
#include "StoragePostgreSQL.h"
#if USE_LIBPQXX
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Core/Settings.h>
#include <Common/parseAddress.h>
#include <Parsers/ASTLiteral.h>
#include <Formats/FormatFactory.h>
#include <Formats/PostgreSQLBlockInputStream.h>
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_,
const String & remote_database_name_,
const String & remote_table_name_,
const String connection_str,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage(table_id_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
, global_context(context_)
, connection(std::make_shared<pqxx::connection>(connection_str))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
}
Pipe StoragePostgreSQL::read(
const Names & column_names_,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info_,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_,
unsigned)
{
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
String query = transformQueryForExternalDatabase(
query_info_,
metadata_snapshot->getColumns().getOrdinary(),
IdentifierQuotingStyle::DoubleQuotes,
remote_database_name,
remote_table_name,
context_);
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
WhichDataType which(column_data.type);
if (which.isEnum())
column_data.type = std::make_shared<DataTypeString>();
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size_)));
}
void registerStoragePostgreSQL(StorageFactory & factory)
{
factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 5)
throw Exception(
"Storage PostgreSQL requires 5-7 parameters: PostgreSQL('host:port', database, table, 'username', 'password'.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context);
auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 5432);
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
String connection_str;
if (remote_database.empty())
{
connection_str = fmt::format("host={} port={} user={} password={}",
parsed_host_port.first, std::to_string(parsed_host_port.second),
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
}
else
{
connection_str = fmt::format("dbname={} host={} port={} user={} password={}",
remote_database, parsed_host_port.first, std::to_string(parsed_host_port.second),
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
}
return StoragePostgreSQL::create(
args.table_id, remote_database, remote_table, connection_str, args.columns, args.constraints, args.context);
},
{
.source_access_type = AccessType::POSTGRES,
});
}
}
#endif

View File

@ -0,0 +1,50 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <ext/shared_ptr_helper.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include "pqxx/pqxx"
namespace DB
{
class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>, public IStorage
{
friend struct ext::shared_ptr_helper<StoragePostgreSQL>;
public:
StoragePostgreSQL(
const StorageID & table_id_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const String connection_str,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
String getName() const override { return "PostgreSQL"; }
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
String remote_database_name;
String remote_table_name;
Context global_context;
std::shared_ptr<pqxx::connection> connection;
};
}
#endif

View File

@ -58,6 +58,9 @@ void registerStorageRabbitMQ(StorageFactory & factory);
void registerStorageEmbeddedRocksDB(StorageFactory & factory);
#endif
#if USE_LIBPQXX
void registerStoragePostgreSQL(StorageFactory & factory);
#endif
void registerStorages()
{
@ -111,6 +114,10 @@ void registerStorages()
#if USE_ROCKSDB
registerStorageEmbeddedRocksDB(factory);
#endif
#if USE_LIBPQXX
registerStoragePostgreSQL(factory);
#endif
}
}