This commit is contained in:
kssenii 2020-12-09 00:40:18 +03:00
parent 69f6714461
commit aa3484515d
5 changed files with 38 additions and 33 deletions

View File

@ -38,6 +38,9 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
, connection(connection_)
{
description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns()))
if (description.types[idx].first == ValueType::vtArray)
prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type);
}
@ -56,7 +59,6 @@ Block PostgreSQLBlockInputStream::readImpl()
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
std::string value;
while (true)
{
@ -73,8 +75,6 @@ Block PostgreSQLBlockInputStream::readImpl()
for (const auto idx : ext::range(0, row->size()))
{
const auto & sample = description.sample_block.getByPosition(idx);
if (!num_rows && description.types[idx].first == ValueType::vtArray)
prepareArrayParser(idx, sample.type);
/// if got NULL type, then pqxx::zview will return nullptr in c_str()
if ((*row)[idx].c_str())
@ -147,6 +147,21 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ValueType::vtDateTime:
assert_cast<ColumnUInt32 &>(column).insertValue(time_t{LocalDateTime{std::string(value)}});
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
break;
}
case ValueType::vtArray:
{
pqxx::array_parser parser{value};
@ -188,24 +203,13 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
case ValueType::vtDate: [[fallthrough]];
case ValueType::vtDateTime: [[fallthrough]];
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
break;
}
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}
void PostgreSQLBlockInputStream::prepareArrayParser(size_t column_idx, const DataTypePtr data_type)
void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataTypePtr data_type)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
auto nested = array_type->getNestedType();
@ -249,21 +253,21 @@ void PostgreSQLBlockInputStream::prepareArrayParser(size_t column_idx, const Dat
else if (which.isDecimal32())
parser = [nested](std::string & field) -> Field
{
auto type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
DataTypeDecimal<Decimal32> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal64())
parser = [nested](std::string & field) -> Field
{
auto type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
DataTypeDecimal<Decimal64> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal128())
parser = [nested](std::string & field) -> Field
{
auto type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
DataTypeDecimal<Decimal128> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};

View File

@ -39,7 +39,7 @@ private:
{
column.insertFrom(sample_column, 0);
}
void prepareArrayParser(size_t column_idx, const DataTypePtr data_type);
void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type);
const String query_str;
const UInt64 max_block_size;

View File

@ -91,7 +91,6 @@ void PostgreSQLBlockOutputStream::writePrefix()
{
storage.checkConnection(connection);
work = std::make_unique<pqxx::work>(*connection);
stream_inserter = std::make_unique<pqxx::stream_to>(*work, remote_table_name);
}
@ -100,7 +99,9 @@ void PostgreSQLBlockOutputStream::write(const Block & block)
const auto columns = block.getColumns();
const size_t num_rows = block.rows(), num_cols = block.columns();
const auto data_types = block.getDataTypes();
const auto settings = FormatSettings{};
if (!stream_inserter)
stream_inserter = std::make_unique<pqxx::stream_to>(*work, remote_table_name, block.getNames());
/// std::optional lets libpqxx to know if value is NULL
std::vector<std::optional<std::string>> row(num_cols);
@ -116,7 +117,7 @@ void PostgreSQLBlockOutputStream::write(const Block & block)
else
{
WriteBufferFromOwnString ostr;
data_types[j]->serializeAsText(*columns[j], i, ostr, settings);
data_types[j]->serializeAsText(*columns[j], i, ostr, FormatSettings{});
row[j] = std::optional<std::string>(ostr.str());
if (isArray(data_types[j]))
@ -130,7 +131,7 @@ void PostgreSQLBlockOutputStream::write(const Block & block)
}
}
/// pqxx::stream_to uses COPY instead of insert query, so it is faster if inserting large number of rows
/// pqxx::stream_to is much faster than simple insert, especially for large number of rows
stream_inserter->write_values(row);
}
}
@ -138,7 +139,8 @@ void PostgreSQLBlockOutputStream::write(const Block & block)
void PostgreSQLBlockOutputStream::writeSuffix()
{
stream_inserter->complete();
if (stream_inserter)
stream_inserter->complete();
work->commit();
}
@ -159,9 +161,9 @@ void registerStoragePostgreSQL(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 5)
throw Exception(
"Storage PostgreSQL requires 5-7 parameters: PostgreSQL('host:port', 'database', 'table', 'username', 'password'.",
if (engine_args.size() != 5)
throw Exception("Storage PostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)

View File

@ -74,11 +74,9 @@ DataTypePtr TableFunctionPostgreSQL::getDataType(std::string & type, bool is_nul
{
DataTypePtr res;
/// Get rid of trailing '[]' for arrays
if (dimensions)
{
/// No matter how many dimensions, in type we will get only one '[]' (i.e. Integer[])
type.resize(type.size() - 2);
}
if (type == "smallint")
res = std::make_shared<DataTypeInt16>();
@ -100,7 +98,7 @@ DataTypePtr TableFunctionPostgreSQL::getDataType(std::string & type, bool is_nul
res = std::make_shared<DataTypeDate>();
else if (type.starts_with("numeric"))
{
/// Numeric and decimal will both be numeric
/// Numeric and decimal will both end up here as numeric
/// Will get numeric(precision, scale) string, need to extract precision and scale
std::vector<std::string> result;
boost::split(result, type, [](char c){ return c == '(' || c == ',' || c == ')'; });
@ -143,7 +141,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
ASTs & args = func_args.arguments->children;
if (args.size() < 5)
if (args.size() != 5)
throw Exception("Table function 'PostgreSQL' requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -9,6 +9,7 @@
namespace DB
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
class TableFunctionPostgreSQL : public ITableFunction
{
@ -30,7 +31,7 @@ private:
String connection_str;
String remote_table_name;
std::shared_ptr<pqxx::connection> connection;
ConnectionPtr connection;
};
}