Merge pull request #45591 from bharatnc/ncb/odbc-connection-pool-fixes

fixes to use the odbc_bridge_use_connection_pooling setting correctly
This commit is contained in:
Kseniia Sumarokova 2023-01-31 11:31:59 +01:00 committed by GitHub
commit 7d53f8bbf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 113 additions and 16 deletions

View File

@ -61,13 +61,18 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
return;
}
bool use_connection_pooling = params.getParsed<bool>("use_connection_pooling", true);
try
{
std::string connection_string = params.get("connection_string");
auto connection = ODBCPooledConnectionFactory::instance().get(
validateODBCConnectionString(connection_string),
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
nanodbc::ConnectionHolderPtr connection;
if (use_connection_pooling)
connection = ODBCPooledConnectionFactory::instance().get(
validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
else
connection = std::make_shared<nanodbc::ConnectionHolder>(validateODBCConnectionString(connection_string));
auto identifier = getIdentifierQuote(std::move(connection));

View File

@ -102,7 +102,9 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
std::string format = params.get("format", "RowBinary");
std::string connection_string = params.get("connection_string");
bool use_connection_pooling = params.getParsed<bool>("use_connection_pooling", true);
LOG_TRACE(log, "Connection string: '{}'", connection_string);
LOG_TRACE(log, "Use pooling: {}", use_connection_pooling);
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
@ -134,7 +136,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
try
{
nanodbc::ConnectionHolderPtr connection_handler;
if (getContext()->getSettingsRef().odbc_bridge_use_connection_pooling)
if (use_connection_pooling)
connection_handler = ODBCPooledConnectionFactory::instance().get(
validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
else

View File

@ -70,13 +70,19 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
return;
}
bool use_connection_pooling = params.getParsed<bool>("use_connection_pooling", true);
try
{
std::string connection_string = params.get("connection_string");
auto connection = ODBCPooledConnectionFactory::instance().get(
validateODBCConnectionString(connection_string),
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
nanodbc::ConnectionHolderPtr connection;
if (use_connection_pooling)
connection = ODBCPooledConnectionFactory::instance().get(
validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
else
connection = std::make_shared<nanodbc::ConnectionHolder>(validateODBCConnectionString(connection_string));
bool result = isSchemaAllowed(std::move(connection));

View File

@ -63,10 +63,12 @@ public:
XDBCBridgeHelper(
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
const std::string & connection_string_,
bool use_connection_pooling_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, use_connection_pooling(use_connection_pooling_)
, http_timeout(http_timeout_)
, config(context_->getGlobalContext()->getConfigRef())
{
@ -132,6 +134,7 @@ protected:
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
return uri;
}
@ -146,6 +149,7 @@ private:
Poco::Logger * log;
std::string connection_string;
bool use_connection_pooling;
Poco::Timespan http_timeout;
std::string bridge_host;
size_t bridge_port;
@ -189,6 +193,7 @@ protected:
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
@ -210,6 +215,7 @@ protected:
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION));
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);

View File

@ -233,7 +233,11 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
bool /* check_config */) -> DictionarySourcePtr {
#if USE_ODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(
global_context, global_context->getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string"));
global_context,
global_context->getSettings().http_receive_timeout,
config.getString(config_prefix + ".odbc.connection_string"),
config.getBool(config_prefix + ".settings.odbc_bridge_use_connection_pooling",
global_context->getSettingsRef().odbc_bridge_use_connection_pooling));
std::string settings_config_prefix = config_prefix + ".odbc";

View File

@ -173,7 +173,8 @@ namespace
BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(args.getContext(),
args.getContext()->getSettingsRef().http_receive_timeout.value,
checkAndGetLiteralArgument<String>(engine_args[0], "connection_string"));
checkAndGetLiteralArgument<String>(engine_args[0], "connection_string"),
args.getContext()->getSettingsRef().odbc_bridge_use_connection_pooling.value);
return std::make_shared<StorageXDBC>(
args.table_id,
checkAndGetLiteralArgument<String>(engine_args[1], "database_name"),

View File

@ -56,7 +56,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
{
if (!helper)
{
helper = createBridgeHelper(context, context->getSettingsRef().http_receive_timeout.value, connection_string);
helper = createBridgeHelper(context, context->getSettingsRef().http_receive_timeout.value, connection_string, context->getSettingsRef().odbc_bridge_use_connection_pooling.value);
helper->startBridgeSync();
}
}

View File

@ -21,7 +21,8 @@ private:
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
Poco::Timespan http_timeout_,
const std::string & connection_string_) const = 0;
const std::string & connection_string_,
bool use_connection_pooling_) const = 0;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
@ -47,9 +48,10 @@ public:
private:
BridgeHelperPtr createBridgeHelper(ContextPtr context,
Poco::Timespan http_timeout_,
const std::string & connection_string_) const override
const std::string & connection_string_,
bool use_connection_pooling_) const override
{
return std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(context, http_timeout_, connection_string_);
return std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(context, http_timeout_, connection_string_, use_connection_pooling_);
}
const char * getStorageTypeName() const override { return "JDBC"; }
@ -67,9 +69,10 @@ public:
private:
BridgeHelperPtr createBridgeHelper(ContextPtr context,
Poco::Timespan http_timeout_,
const std::string & connection_string_) const override
const std::string & connection_string_,
bool use_connection_pooling_) const override
{
return std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, http_timeout_, connection_string_);
return std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, http_timeout_, connection_string_, use_connection_pooling_);
}
const char * getStorageTypeName() const override { return "ODBC"; }

View File

@ -0,0 +1,41 @@
<clickhouse>
<dictionary>
<name>postgres_odbc_nopool</name>
<source>
<odbc>
<table>clickhouse.test_table</table>
<connection_string>DSN=postgresql_odbc</connection_string>
<db>postgres</db>
</odbc>
<settings>
<odbc_bridge_use_connection_pooling>0</odbc_bridge_use_connection_pooling>
</settings>
</source>
<lifetime>
<min>5</min>
<max>5</max>
</lifetime>
<layout>
<hashed />
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>column1</name>
<type>Int64</type>
<null_value>1</null_value>
</attribute>
<attribute>
<name>column2</name>
<type>String</type>
<null_value>''</null_value>
</attribute>
</structure>
</dictionary>
</clickhouse>

View File

@ -21,6 +21,7 @@ node1 = cluster.add_instance(
"configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml",
"configs/dictionaries/sqlite3_odbc_cached_dictionary.xml",
"configs/dictionaries/postgres_odbc_hashed_dictionary.xml",
"configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml",
],
)
@ -624,6 +625,34 @@ def test_postgres_odbc_hashed_dictionary_no_tty_pipe_overflow(started_cluster):
cursor.execute("truncate table clickhouse.test_table")
def test_no_connection_pooling(started_cluster):
skip_test_msan(node1)
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute(
"insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')"
)
node1.exec_in_container(["ss", "-K", "dport", "5432"], privileged=True, user="root")
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_nopool")
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(1))",
"hello",
)
assert_eq_with_retry(
node1,
"select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(2))",
"world",
)
# No open connections should be left because we don't use connection pooling.
assert "" == node1.exec_in_container(
["ss", "-H", "dport", "5432"], privileged=True, user="root"
)
cursor.execute("truncate table clickhouse.test_table")
def test_postgres_insert(started_cluster):
skip_test_msan(node1)