Add connection pools

This commit is contained in:
kssenii 2021-03-31 12:41:12 +00:00
parent 3dadb2db94
commit cb845731b0
6 changed files with 99 additions and 16 deletions

View File

@ -17,8 +17,8 @@
#include <ext/scope_guard.h>
#include "getIdentifierQuote.h"
#include "validateODBCConnectionString.h"
#include "ODBCConnectionFactory.h"
#include <nanodbc/nanodbc.h>
#include <sql.h>
#include <sqlext.h>
@ -105,8 +105,8 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
const bool external_table_functions_use_nulls = Poco::NumberParser::parseBool(params.get("external_table_functions_use_nulls", "false"));
nanodbc::connection connection(validateODBCConnectionString(connection_string));
nanodbc::catalog catalog(connection);
auto connection = ODBCConnectionFactory::instance().get(validateODBCConnectionString(connection_string));
nanodbc::catalog catalog(*connection);
std::string catalog_name;
/// In XDBC tables it is allowed to pass either database_name or schema_name in table definion, but not both of them.

View File

@ -14,7 +14,8 @@
#include <ext/scope_guard.h>
#include "getIdentifierQuote.h"
#include "validateODBCConnectionString.h"
#include <nanodbc/nanodbc.h>
#include "ODBCConnectionFactory.h"
namespace DB
{
@ -40,8 +41,8 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
try
{
std::string connection_string = params.get("connection_string");
nanodbc::connection connection(validateODBCConnectionString(connection_string));
auto identifier = getIdentifierQuote(connection);
auto connection = ODBCConnectionFactory::instance().get(validateODBCConnectionString(connection_string));
auto identifier = getIdentifierQuote(*connection);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -18,6 +18,7 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>
#include "ODBCConnectionFactory.h"
#include <mutex>
#include <memory>
@ -104,8 +105,8 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
nanodbc::connection connection(connection_string);
auto connection = ODBCConnectionFactory::instance().get(validateODBCConnectionString(connection_string));
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
@ -128,12 +129,12 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
auto quoting_style = IdentifierQuotingStyle::None;
#if USE_ODBC
quoting_style = getQuotingStyle(connection);
quoting_style = getQuotingStyle(*connection);
#endif
auto & read_buf = request.getStream();
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, context, max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
ODBCBlockOutputStream output_stream(connection, db_name, table_name, *sample_block, context, quoting_style);
ODBCBlockOutputStream output_stream(*connection, db_name, table_name, *sample_block, context, quoting_style);
copyData(*input_stream, output_stream);
writeStringBinary("Ok.", out);
}
@ -143,7 +144,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
LOG_TRACE(log, "Query: {}", query);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, context);
ODBCBlockInputStream inp(connection, query, *sample_block, max_block_size);
ODBCBlockInputStream inp(*connection, query, *sample_block, max_block_size);
copyData(inp, *writer);
}
}

View File

@ -0,0 +1,83 @@
#pragma once
#include <common/logger_useful.h>
#include <nanodbc/nanodbc.h>
#include <mutex>
#include <Common/BorrowedObjectPool.h>
#include <unordered_map>
namespace nanodbc
{
static constexpr inline auto ODBC_CONNECT_TIMEOUT = 100;
using ConnectionPtr = std::shared_ptr<nanodbc::connection>;
using Pool = BorrowedObjectPool<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
class ConnectionHolder
{
public:
ConnectionHolder(const std::string & connection_string_, PoolPtr pool_) : connection_string(connection_string_), pool(pool_) {}
~ConnectionHolder()
{
pool->returnObject(std::move(connection));
}
nanodbc::connection & operator*()
{
if (!connection || !connection->connected())
{
pool->borrowObject(connection, [&]()
{
return std::make_shared<nanodbc::connection>(connection_string, ODBC_CONNECT_TIMEOUT);
});
}
return *connection;
}
private:
std::string connection_string;
PoolPtr pool;
ConnectionPtr connection;
};
}
namespace DB
{
static constexpr inline auto ODBC_DEFAULT_POOL_SIZE = 16;
class ODBCConnectionFactory final : private boost::noncopyable
{
public:
static ODBCConnectionFactory & instance()
{
static ODBCConnectionFactory ret;
return ret;
}
nanodbc::ConnectionHolder get(const std::string & connection_string, size_t pool_size = ODBC_DEFAULT_POOL_SIZE)
{
std::lock_guard lock(mutex);
if (!factory.count(connection_string))
factory.emplace(std::make_pair(connection_string, std::make_shared<nanodbc::Pool>(pool_size)));
return nanodbc::ConnectionHolder(connection_string, factory[connection_string]);
}
private:
/// [connection_string] -> [connection_pool]
using PoolFactory = std::unordered_map<std::string, nanodbc::PoolPtr>;
PoolFactory factory;
std::mutex mutex;
};
}

View File

@ -9,8 +9,7 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <common/logger_useful.h>
#include "validateODBCConnectionString.h"
#include <nanodbc/nanodbc.h>
#include "ODBCConnectionFactory.h"
#include <sql.h>
#include <sqlext.h>
@ -49,9 +48,8 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
try
{
std::string connection_string = params.get("connection_string");
nanodbc::connection connection(validateODBCConnectionString(connection_string));
bool result = isSchemaAllowed(connection);
auto connection = ODBCConnectionFactory::instance().get(validateODBCConnectionString(connection_string));
bool result = isSchemaAllowed(*connection);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -269,7 +269,7 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
node1.exec_in_container(["bash", "-c", "chmod a+rw /tmp"], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "chmod a+rw {}".format(sqlite_db)], privileged=True, user='root')
node1.query("insert into table function odbc('DSN={};', '', 't3') values (200, 2, 7)".format(
node1.query("insert into table function odbc('DSN={};ReadOnly=0', '', 't3') values (200, 2, 7)".format(
node1.odbc_drivers["SQLite3"]["DSN"]))
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))") == "7\n" # new value