From aa3484515d2b44e5bc9d57ee3ec382883ef2232a Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 9 Dec 2020 00:40:18 +0300 Subject: [PATCH] Better --- .../PostgreSQLBlockInputStream.cpp | 40 ++++++++++--------- src/DataStreams/PostgreSQLBlockInputStream.h | 2 +- src/Storages/StoragePostgreSQL.cpp | 18 +++++---- .../TableFunctionPostgreSQL.cpp | 8 ++-- src/TableFunctions/TableFunctionPostgreSQL.h | 3 +- 5 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/DataStreams/PostgreSQLBlockInputStream.cpp b/src/DataStreams/PostgreSQLBlockInputStream.cpp index afa6d40f3de..bba46983081 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.cpp +++ b/src/DataStreams/PostgreSQLBlockInputStream.cpp @@ -38,6 +38,9 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( , connection(connection_) { description.init(sample_block); + for (const auto idx : ext::range(0, description.sample_block.columns())) + if (description.types[idx].first == ValueType::vtArray) + prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type); } @@ -56,7 +59,6 @@ Block PostgreSQLBlockInputStream::readImpl() MutableColumns columns = description.sample_block.cloneEmptyColumns(); size_t num_rows = 0; - std::string value; while (true) { @@ -73,8 +75,6 @@ Block PostgreSQLBlockInputStream::readImpl() for (const auto idx : ext::range(0, row->size())) { const auto & sample = description.sample_block.getByPosition(idx); - if (!num_rows && description.types[idx].first == ValueType::vtArray) - prepareArrayParser(idx, sample.type); /// if got NULL type, then pqxx::zview will return nullptr in c_str() if ((*row)[idx].c_str()) @@ -147,6 +147,21 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view case ValueType::vtUUID: assert_cast(column).insert(parse(value.data(), value.size())); break; + case ValueType::vtDate: + assert_cast(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()}); + break; + case ValueType::vtDateTime: + assert_cast(column).insertValue(time_t{LocalDateTime{std::string(value)}}); + break; + case ValueType::vtDateTime64:[[fallthrough]]; + case ValueType::vtDecimal32: [[fallthrough]]; + case ValueType::vtDecimal64: [[fallthrough]]; + case ValueType::vtDecimal128: + { + ReadBufferFromString istr(value); + data_type->deserializeAsWholeText(column, istr, FormatSettings{}); + break; + } case ValueType::vtArray: { pqxx::array_parser parser{value}; @@ -188,24 +203,13 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view assert_cast(column).insert(Array(dimensions[1].begin(), dimensions[1].end())); break; } - case ValueType::vtDate: [[fallthrough]]; - case ValueType::vtDateTime: [[fallthrough]]; - case ValueType::vtDateTime64:[[fallthrough]]; - case ValueType::vtDecimal32: [[fallthrough]]; - case ValueType::vtDecimal64: [[fallthrough]]; - case ValueType::vtDecimal128: - { - ReadBufferFromString istr(value); - data_type->deserializeAsWholeText(column, istr, FormatSettings{}); - break; - } default: throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE); } } -void PostgreSQLBlockInputStream::prepareArrayParser(size_t column_idx, const DataTypePtr data_type) +void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataTypePtr data_type) { const auto * array_type = typeid_cast(data_type.get()); auto nested = array_type->getNestedType(); @@ -249,21 +253,21 @@ void PostgreSQLBlockInputStream::prepareArrayParser(size_t column_idx, const Dat else if (which.isDecimal32()) parser = [nested](std::string & field) -> Field { - auto type = typeid_cast *>(nested.get()); + const auto & type = typeid_cast *>(nested.get()); DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); return convertFieldToType(field, res); }; else if (which.isDecimal64()) parser = [nested](std::string & field) -> Field { - auto type = typeid_cast *>(nested.get()); + const auto & type = typeid_cast *>(nested.get()); DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); return convertFieldToType(field, res); }; else if (which.isDecimal128()) parser = [nested](std::string & field) -> Field { - auto type = typeid_cast *>(nested.get()); + const auto & type = typeid_cast *>(nested.get()); DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); return convertFieldToType(field, res); }; diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index 2820845e4b5..038b53853f3 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -39,7 +39,7 @@ private: { column.insertFrom(sample_column, 0); } - void prepareArrayParser(size_t column_idx, const DataTypePtr data_type); + void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type); const String query_str; const UInt64 max_block_size; diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index 91a6b542b8b..ebbe49e78bf 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -91,7 +91,6 @@ void PostgreSQLBlockOutputStream::writePrefix() { storage.checkConnection(connection); work = std::make_unique(*connection); - stream_inserter = std::make_unique(*work, remote_table_name); } @@ -100,7 +99,9 @@ void PostgreSQLBlockOutputStream::write(const Block & block) const auto columns = block.getColumns(); const size_t num_rows = block.rows(), num_cols = block.columns(); const auto data_types = block.getDataTypes(); - const auto settings = FormatSettings{}; + + if (!stream_inserter) + stream_inserter = std::make_unique(*work, remote_table_name, block.getNames()); /// std::optional lets libpqxx to know if value is NULL std::vector> row(num_cols); @@ -116,7 +117,7 @@ void PostgreSQLBlockOutputStream::write(const Block & block) else { WriteBufferFromOwnString ostr; - data_types[j]->serializeAsText(*columns[j], i, ostr, settings); + data_types[j]->serializeAsText(*columns[j], i, ostr, FormatSettings{}); row[j] = std::optional(ostr.str()); if (isArray(data_types[j])) @@ -130,7 +131,7 @@ void PostgreSQLBlockOutputStream::write(const Block & block) } } - /// pqxx::stream_to uses COPY instead of insert query, so it is faster if inserting large number of rows + /// pqxx::stream_to is much faster than simple insert, especially for large number of rows stream_inserter->write_values(row); } } @@ -138,7 +139,8 @@ void PostgreSQLBlockOutputStream::write(const Block & block) void PostgreSQLBlockOutputStream::writeSuffix() { - stream_inserter->complete(); + if (stream_inserter) + stream_inserter->complete(); work->commit(); } @@ -159,9 +161,9 @@ void registerStoragePostgreSQL(StorageFactory & factory) { ASTs & engine_args = args.engine_args; - if (engine_args.size() < 5) - throw Exception( - "Storage PostgreSQL requires 5-7 parameters: PostgreSQL('host:port', 'database', 'table', 'username', 'password'.", + if (engine_args.size() != 5) + throw Exception("Storage PostgreSQL requires 5 parameters: " + "PostgreSQL('host:port', 'database', 'table', 'username', 'password'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & engine_arg : engine_args) diff --git a/src/TableFunctions/TableFunctionPostgreSQL.cpp b/src/TableFunctions/TableFunctionPostgreSQL.cpp index 388560c028b..7b8e98c50f6 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.cpp +++ b/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -74,11 +74,9 @@ DataTypePtr TableFunctionPostgreSQL::getDataType(std::string & type, bool is_nul { DataTypePtr res; + /// Get rid of trailing '[]' for arrays if (dimensions) - { - /// No matter how many dimensions, in type we will get only one '[]' (i.e. Integer[]) type.resize(type.size() - 2); - } if (type == "smallint") res = std::make_shared(); @@ -100,7 +98,7 @@ DataTypePtr TableFunctionPostgreSQL::getDataType(std::string & type, bool is_nul res = std::make_shared(); else if (type.starts_with("numeric")) { - /// Numeric and decimal will both be numeric + /// Numeric and decimal will both end up here as numeric /// Will get numeric(precision, scale) string, need to extract precision and scale std::vector result; boost::split(result, type, [](char c){ return c == '(' || c == ',' || c == ')'; }); @@ -143,7 +141,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const ASTs & args = func_args.arguments->children; - if (args.size() < 5) + if (args.size() != 5) throw Exception("Table function 'PostgreSQL' requires 5 parameters: " "PostgreSQL('host:port', 'database', 'table', 'user', 'password').", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); diff --git a/src/TableFunctions/TableFunctionPostgreSQL.h b/src/TableFunctions/TableFunctionPostgreSQL.h index e1cd3c77195..756d6b2996f 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.h +++ b/src/TableFunctions/TableFunctionPostgreSQL.h @@ -9,6 +9,7 @@ namespace DB { +using ConnectionPtr = std::shared_ptr; class TableFunctionPostgreSQL : public ITableFunction { @@ -30,7 +31,7 @@ private: String connection_str; String remote_table_name; - std::shared_ptr connection; + ConnectionPtr connection; }; }