Review fixes

This commit is contained in:
kssenii 2021-05-07 11:18:49 +00:00
parent 93dce29a0a
commit 912878ad7f
22 changed files with 48 additions and 57 deletions

View File

@ -109,7 +109,7 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
validateODBCConnectionString(connection_string),
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
nanodbc::catalog catalog(connection.get());
nanodbc::catalog catalog(connection->get());
std::string catalog_name;
/// In XDBC tables it is allowed to pass either database_name or schema_name in table definion, but not both of them.

View File

@ -46,7 +46,7 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
validateODBCConnectionString(connection_string),
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
auto identifier = getIdentifierQuote(connection.get());
auto identifier = getIdentifierQuote(connection->get());
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -130,12 +130,12 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
auto quoting_style = IdentifierQuotingStyle::None;
#if USE_ODBC
quoting_style = getQuotingStyle(connection.get());
quoting_style = getQuotingStyle(connection->get());
#endif
auto & read_buf = request.getStream();
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, getContext(), max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
ODBCBlockOutputStream output_stream(connection, db_name, table_name, *sample_block, getContext(), quoting_style);
ODBCBlockOutputStream output_stream(std::move(connection), db_name, table_name, *sample_block, getContext(), quoting_style);
copyData(*input_stream, output_stream);
writeStringBinary("Ok.", out);
}
@ -145,7 +145,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
LOG_TRACE(log, "Query: {}", query);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, getContext());
ODBCBlockInputStream inp(connection, query, *sample_block, max_block_size);
ODBCBlockInputStream inp(std::move(connection), query, *sample_block, max_block_size);
copyData(inp, *writer);
}
}

View File

@ -21,13 +21,13 @@ namespace ErrorCodes
ODBCBlockInputStream::ODBCBlockInputStream(
nanodbc::ConnectionHolder & connection, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
nanodbc::ConnectionHolderPtr connection, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
: log(&Poco::Logger::get("ODBCBlockInputStream"))
, max_block_size{max_block_size_}
, query(query_str)
{
description.init(sample_block);
result = execute(connection.get(), NANODBC_TEXT(query));
result = execute(connection->get(), NANODBC_TEXT(query));
}

View File

@ -13,7 +13,7 @@ namespace DB
class ODBCBlockInputStream final : public IBlockInputStream
{
public:
ODBCBlockInputStream(nanodbc::ConnectionHolder & connection, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_);
ODBCBlockInputStream(nanodbc::ConnectionHolderPtr connection, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_);
String getName() const override { return "ODBC"; }

View File

@ -40,14 +40,14 @@ namespace
}
ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::ConnectionHolder & connection_,
ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::ConnectionHolderPtr connection_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const Block & sample_block_,
ContextPtr local_context_,
IdentifierQuotingStyle quoting_)
: log(&Poco::Logger::get("ODBCBlockOutputStream"))
, connection(connection_)
, connection(std::move(connection_))
, db_name(remote_database_name_)
, table_name(remote_table_name_)
, sample_block(sample_block_)
@ -69,7 +69,7 @@ void ODBCBlockOutputStream::write(const Block & block)
writer->write(block);
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
execute(connection.get(), query);
execute(connection->get(), query);
}
}

View File

@ -16,7 +16,7 @@ class ODBCBlockOutputStream : public IBlockOutputStream
public:
ODBCBlockOutputStream(
nanodbc::ConnectionHolder & connection_,
nanodbc::ConnectionHolderPtr connection_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const Block & sample_block_,
@ -29,7 +29,7 @@ public:
private:
Poco::Logger * log;
nanodbc::ConnectionHolder & connection;
nanodbc::ConnectionHolderPtr connection;
std::string db_name;
std::string table_name;
Block sample_block;

View File

@ -40,6 +40,8 @@ private:
PoolPtr pool;
ConnectionPtr connection;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;
}
@ -58,7 +60,7 @@ public:
return ret;
}
nanodbc::ConnectionHolder get(const std::string & connection_string, size_t pool_size)
nanodbc::ConnectionHolderPtr get(const std::string & connection_string, size_t pool_size)
{
std::lock_guard lock(mutex);
@ -83,7 +85,7 @@ public:
pool->returnObject(std::move(connection));
}
return nanodbc::ConnectionHolder(factory[connection_string], std::move(connection));
return std::make_unique<nanodbc::ConnectionHolder>(factory[connection_string], std::move(connection));
}
private:

View File

@ -53,7 +53,7 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
validateODBCConnectionString(connection_string),
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
bool result = isSchemaAllowed(connection.get());
bool result = isSchemaAllowed(connection->get());
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -371,7 +371,7 @@ class IColumn;
M(Bool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries", 0) \
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(Int64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
M(UInt64, glob_expansion_max_elements, 1000, "Maximum number of allowed addresses (For external storages, table functions, etc).", 0) \
M(UInt64, odbc_bridge_connection_pool_size, 16, "Connection pool size for each connection settings string in ODBC bridge.", 0) \
\

View File

@ -28,13 +28,13 @@ namespace ErrorCodes
}
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
postgres::ConnectionHolderPtr entry_,
postgres::ConnectionHolderPtr connection_holder_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, entry(std::move(entry_))
, connection_holder(std::move(connection_holder_))
{
description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns()))
@ -48,7 +48,7 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
void PostgreSQLBlockInputStream::readPrefix()
{
tx = std::make_unique<pqxx::read_transaction>(entry->get());
tx = std::make_unique<pqxx::read_transaction>(connection_holder->get());
stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str));
}

View File

@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLBlockInputStream(
postgres::ConnectionHolderPtr entry_,
postgres::ConnectionHolderPtr connection_holder_,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_);
@ -46,7 +46,7 @@ private:
const UInt64 max_block_size;
ExternalResultDescription description;
postgres::ConnectionHolderPtr entry;
postgres::ConnectionHolderPtr connection_holder;
std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream;

View File

@ -88,8 +88,8 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
auto entry = connection_pool->get();
pqxx::read_transaction tx(entry->get());
auto connection_holder = connection_pool->get();
pqxx::read_transaction tx(connection_holder->get());
for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name));
@ -107,8 +107,8 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
"PostgreSQL table name cannot contain single quote or backslash characters, passed {}", table_name);
}
auto entry = connection_pool->get();
pqxx::nontransaction tx(entry->get());
auto connection_holder = connection_pool->get();
pqxx::nontransaction tx(connection_holder->get());
try
{
@ -169,7 +169,7 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), *connection_pool, table_name,
StorageID(database_name, table_name), connection_pool, table_name,
ColumnsDescription{*columns}, ConstraintsDescription{}, local_context);
if (cache_tables)

View File

@ -96,7 +96,7 @@ static DataTypePtr convertPostgreSQLDataType(String & type, bool is_nullable, ui
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
postgres::ConnectionHolderPtr entry, const String & postgres_table_name, bool use_nulls)
postgres::ConnectionHolderPtr connection_holder, const String & postgres_table_name, bool use_nulls)
{
auto columns = NamesAndTypesList();
@ -115,7 +115,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
"AND NOT attisdropped AND attnum > 0", postgres_table_name);
try
{
pqxx::read_transaction tx(entry->get());
pqxx::read_transaction tx(connection_holder->get());
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
std::tuple<std::string, std::string, std::string, uint16_t> row;
@ -135,7 +135,7 @@ std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
{
throw Exception(fmt::format(
"PostgreSQL table {}.{} does not exist",
entry->get().dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE);
connection_holder->get().dbname(), postgres_table_name), ErrorCodes::UNKNOWN_TABLE);
}
catch (Exception & e)
{

View File

@ -13,7 +13,7 @@ namespace DB
{
std::shared_ptr<NamesAndTypesList> fetchPostgreSQLTableStructure(
postgres::ConnectionHolderPtr entry, const String & postgres_table_name, bool use_nulls);
postgres::ConnectionHolderPtr connection_holder, const String & postgres_table_name, bool use_nulls);
}

View File

@ -16,8 +16,7 @@ class ConnectionHolder
{
public:
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_, const String & connection_string_)
: pool(pool_), connection(std::move(connection_)), connection_string(connection_string_) {}
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {}
ConnectionHolder(const ConnectionHolder & other) = delete;
@ -32,7 +31,6 @@ public:
private:
PoolPtr pool;
ConnectionPtr connection;
const String connection_string;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;

View File

@ -90,13 +90,6 @@ PoolWithFailover::PoolWithFailover(
}
}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: replicas_with_priority(other.replicas_with_priority)
, pool_wait_timeout(other.pool_wait_timeout)
, max_tries(other.max_tries)
{
}
ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
@ -141,13 +134,13 @@ ConnectionHolderPtr PoolWithFailover::get()
throw;
}
auto entry = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), replica.connection_string);
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection));
/// Move all traversed replicas to the end.
if (replicas.size() > 1)
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return entry;
return connection_holder;
}
}
}

View File

@ -37,13 +37,11 @@ public:
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(const PoolWithFailover & other);
PoolWithFailover(const PoolWithFailover & other) = delete;
ConnectionHolderPtr get();
private:
bool getAndValidateConnection();
struct PoolHolder
{
String connection_string;

View File

@ -85,7 +85,7 @@ StorageExternalDistributed::StorageExternalDistributed(
{
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
postgres::PoolWithFailover pool(
auto pool = std::make_shared<postgres::PoolWithFailover>(
remote_database,
addresses,
username, password,

View File

@ -41,7 +41,7 @@ namespace ErrorCodes
StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_,
const postgres::PoolWithFailover & pool_,
postgres::PoolWithFailoverPtr pool_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
@ -51,7 +51,7 @@ StoragePostgreSQL::StoragePostgreSQL(
, remote_table_name(remote_table_name_)
, remote_table_schema(remote_table_schema_)
, global_context(context_)
, pool(std::make_shared<postgres::PoolWithFailover>(pool_))
, pool(std::move(pool_))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -97,10 +97,10 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public:
explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
postgres::ConnectionHolderPtr entry_,
postgres::ConnectionHolderPtr connection_holder_,
const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_)
, entry(std::move(entry_))
, connection_holder(std::move(connection_holder_))
, remote_table_name(remote_table_name_)
{
}
@ -110,7 +110,7 @@ public:
void writePrefix() override
{
work = std::make_unique<pqxx::work>(entry->get());
work = std::make_unique<pqxx::work>(connection_holder->get());
}
@ -276,7 +276,7 @@ public:
private:
StorageMetadataPtr metadata_snapshot;
postgres::ConnectionHolderPtr entry;
postgres::ConnectionHolderPtr connection_holder;
std::string remote_table_name;
std::unique_ptr<pqxx::work> work;
@ -319,7 +319,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
if (engine_args.size() == 6)
remote_table_schema = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
postgres::PoolWithFailover pool(
auto pool = std::make_shared<postgres::PoolWithFailover>(
remote_database,
addresses,
username,
@ -328,7 +328,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
args.getContext()->getSettingsRef().postgresql_connection_pool_wait_timeout);
return StoragePostgreSQL::create(
args.table_id, pool, remote_table,
args.table_id, std::move(pool), remote_table,
args.columns, args.constraints, args.getContext(), remote_table_schema);
},
{

View File

@ -22,7 +22,7 @@ class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>
public:
StoragePostgreSQL(
const StorageID & table_id_,
const postgres::PoolWithFailover & pool_,
postgres::PoolWithFailoverPtr pool_,
const String & remote_table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,

View File

@ -30,7 +30,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
{
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name), *connection_pool, remote_table_name,
StorageID(getDatabaseName(), table_name), connection_pool, remote_table_name,
columns, ConstraintsDescription{}, context, remote_table_schema);
result->startup();