From 16a2585d5557b9947035649f712a5621524b869a Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Wed, 18 Jan 2023 16:36:25 -0800 Subject: [PATCH 1/4] fixes to odbc connection pooling --- programs/odbc-bridge/IdentifierQuoteHandler.cpp | 11 ++++++++--- programs/odbc-bridge/MainHandler.cpp | 4 +++- programs/odbc-bridge/SchemaAllowedHandler.cpp | 12 +++++++++--- src/BridgeHelper/XDBCBridgeHelper.h | 8 +++++++- src/Dictionaries/XDBCDictionarySource.cpp | 6 +++++- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/programs/odbc-bridge/IdentifierQuoteHandler.cpp b/programs/odbc-bridge/IdentifierQuoteHandler.cpp index 8157e3b6159..f622995bf15 100644 --- a/programs/odbc-bridge/IdentifierQuoteHandler.cpp +++ b/programs/odbc-bridge/IdentifierQuoteHandler.cpp @@ -61,13 +61,18 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ return; } + bool use_connection_pooling = params.getParsed("use_connection_pooling", true); + try { std::string connection_string = params.get("connection_string"); - auto connection = ODBCPooledConnectionFactory::instance().get( - validateODBCConnectionString(connection_string), - getContext()->getSettingsRef().odbc_bridge_connection_pool_size); + nanodbc::ConnectionHolderPtr connection; + if (use_connection_pooling) + connection = ODBCPooledConnectionFactory::instance().get( + validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size); + else + connection = std::make_shared(validateODBCConnectionString(connection_string)); auto identifier = getIdentifierQuote(std::move(connection)); diff --git a/programs/odbc-bridge/MainHandler.cpp b/programs/odbc-bridge/MainHandler.cpp index 0875cc2e9d9..9130b3e0f47 100644 --- a/programs/odbc-bridge/MainHandler.cpp +++ b/programs/odbc-bridge/MainHandler.cpp @@ -102,7 +102,9 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse std::string format = params.get("format", "RowBinary"); std::string connection_string = params.get("connection_string"); + bool use_connection_pooling = params.getParsed("use_connection_pooling", true); LOG_TRACE(log, "Connection string: '{}'", connection_string); + LOG_TRACE(log, "Use pooling: {}", use_connection_pooling); UInt64 max_block_size = DEFAULT_BLOCK_SIZE; if (params.has("max_block_size")) @@ -134,7 +136,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse try { nanodbc::ConnectionHolderPtr connection_handler; - if (getContext()->getSettingsRef().odbc_bridge_use_connection_pooling) + if (use_connection_pooling) connection_handler = ODBCPooledConnectionFactory::instance().get( validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size); else diff --git a/programs/odbc-bridge/SchemaAllowedHandler.cpp b/programs/odbc-bridge/SchemaAllowedHandler.cpp index 4d20c8bc3b7..020359f51fd 100644 --- a/programs/odbc-bridge/SchemaAllowedHandler.cpp +++ b/programs/odbc-bridge/SchemaAllowedHandler.cpp @@ -70,13 +70,19 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer return; } + bool use_connection_pooling = params.getParsed("use_connection_pooling", true); + try { std::string connection_string = params.get("connection_string"); - auto connection = ODBCPooledConnectionFactory::instance().get( - validateODBCConnectionString(connection_string), - getContext()->getSettingsRef().odbc_bridge_connection_pool_size); + nanodbc::ConnectionHolderPtr connection; + + if (use_connection_pooling) + connection = ODBCPooledConnectionFactory::instance().get( + validateODBCConnectionString(connection_string), getContext()->getSettingsRef().odbc_bridge_connection_pool_size); + else + connection = std::make_shared(validateODBCConnectionString(connection_string)); bool result = isSchemaAllowed(std::move(connection)); diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index 139c1ab9726..204edcef82c 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -63,10 +63,12 @@ public: XDBCBridgeHelper( ContextPtr context_, Poco::Timespan http_timeout_, - const std::string & connection_string_) + const std::string & connection_string_, + bool use_connection_pooling_=true) : IXDBCBridgeHelper(context_->getGlobalContext()) , log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper")) , connection_string(connection_string_) + , use_connection_pooling(use_connection_pooling_) , http_timeout(http_timeout_) , config(context_->getGlobalContext()->getConfigRef()) { @@ -132,6 +134,7 @@ protected: uri.setHost(bridge_host); uri.setPort(bridge_port); uri.setScheme("http"); + uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling)); return uri; } @@ -146,6 +149,7 @@ private: Poco::Logger * log; std::string connection_string; + bool use_connection_pooling; Poco::Timespan http_timeout; std::string bridge_host; size_t bridge_port; @@ -189,6 +193,7 @@ protected: uri.setPath(SCHEMA_ALLOWED_HANDLER); uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION)); uri.addQueryParameter("connection_string", getConnectionString()); + uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling)); ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials); @@ -210,6 +215,7 @@ protected: uri.setPath(IDENTIFIER_QUOTE_HANDLER); uri.addQueryParameter("version", std::to_string(XDBC_BRIDGE_PROTOCOL_VERSION)); uri.addQueryParameter("connection_string", getConnectionString()); + uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling)); ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials); diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index dec4feb5ced..7ff8a8ece15 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -233,7 +233,11 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory) bool /* check_config */) -> DictionarySourcePtr { #if USE_ODBC BridgeHelperPtr bridge = std::make_shared>( - global_context, global_context->getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string")); + global_context, + global_context->getSettings().http_receive_timeout, + config.getString(config_prefix + ".odbc.connection_string"), + config.getBool(config_prefix + ".settings.odbc_bridge_use_connection_pooling", + global_context->getSettingsRef().odbc_bridge_use_connection_pooling)); std::string settings_config_prefix = config_prefix + ".odbc"; From fc4eba3b7ba9b95dce26c3134c755e1f8a9296fa Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Tue, 24 Jan 2023 14:22:32 -0800 Subject: [PATCH 2/4] add a test --- ...res_odbc_no_connection_pool_dictionary.xml | 41 +++++++++++++++++++ .../integration/test_odbc_interaction/test.py | 29 +++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 tests/integration/test_odbc_interaction/configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml diff --git a/tests/integration/test_odbc_interaction/configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml b/tests/integration/test_odbc_interaction/configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml new file mode 100644 index 00000000000..a8321b1bbf1 --- /dev/null +++ b/tests/integration/test_odbc_interaction/configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml @@ -0,0 +1,41 @@ + + + postgres_odbc_nopool + + + clickhouse.test_table
+ DSN=postgresql_odbc + postgres +
+ + 0 + + + + 5 + 5 + + + + + + + + id + + + + column1 + Int64 + 1 + + + + column2 + String + '' + + + +
+
diff --git a/tests/integration/test_odbc_interaction/test.py b/tests/integration/test_odbc_interaction/test.py index ed925759114..946f97c2148 100644 --- a/tests/integration/test_odbc_interaction/test.py +++ b/tests/integration/test_odbc_interaction/test.py @@ -21,6 +21,7 @@ node1 = cluster.add_instance( "configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml", "configs/dictionaries/sqlite3_odbc_cached_dictionary.xml", "configs/dictionaries/postgres_odbc_hashed_dictionary.xml", + "configs/dictionaries/postgres_odbc_no_connection_pool_dictionary.xml", ], ) @@ -624,6 +625,34 @@ def test_postgres_odbc_hashed_dictionary_no_tty_pipe_overflow(started_cluster): cursor.execute("truncate table clickhouse.test_table") +def test_no_connection_pooling(started_cluster): + skip_test_msan(node1) + + conn = get_postgres_conn(started_cluster) + cursor = conn.cursor() + cursor.execute("truncate table clickhouse.test_table") + cursor.execute( + "insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')" + ) + node1.exec_in_container(["ss", "-K", "dport", "5432"], privileged=True, user="root") + node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_nopool") + assert_eq_with_retry( + node1, + "select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(1))", + "hello", + ) + assert_eq_with_retry( + node1, + "select dictGetString('postgres_odbc_nopool', 'column2', toUInt64(2))", + "world", + ) + + # No open connections should be left because we don't use connection pooling. + assert "" == node1.exec_in_container( + ["ss", "-H", "dport", "5432"], privileged=True, user="root" + ) + + def test_postgres_insert(started_cluster): skip_test_msan(node1) From 70a3ffa0e6e857cc9db943b9696084c26b57026f Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Wed, 25 Jan 2023 06:46:19 -0800 Subject: [PATCH 3/4] fix test --- tests/integration/test_odbc_interaction/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_odbc_interaction/test.py b/tests/integration/test_odbc_interaction/test.py index 946f97c2148..14f5de17870 100644 --- a/tests/integration/test_odbc_interaction/test.py +++ b/tests/integration/test_odbc_interaction/test.py @@ -630,7 +630,6 @@ def test_no_connection_pooling(started_cluster): conn = get_postgres_conn(started_cluster) cursor = conn.cursor() - cursor.execute("truncate table clickhouse.test_table") cursor.execute( "insert into clickhouse.test_table values(1, 1, 'hello'),(2, 2, 'world')" ) @@ -651,6 +650,7 @@ def test_no_connection_pooling(started_cluster): assert "" == node1.exec_in_container( ["ss", "-H", "dport", "5432"], privileged=True, user="root" ) + cursor.execute("truncate table clickhouse.test_table") def test_postgres_insert(started_cluster): From 0d2e4fcc152cd33b7e1f9d5a86de838afa8c3d04 Mon Sep 17 00:00:00 2001 From: Bharat Nallan Chakravarthy Date: Mon, 30 Jan 2023 08:05:35 -0800 Subject: [PATCH 4/4] set use_connection_pooling from settings everywhere --- src/BridgeHelper/XDBCBridgeHelper.h | 2 +- src/Storages/StorageXDBC.cpp | 3 ++- src/TableFunctions/ITableFunctionXDBC.cpp | 2 +- src/TableFunctions/ITableFunctionXDBC.h | 13 ++++++++----- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/BridgeHelper/XDBCBridgeHelper.h b/src/BridgeHelper/XDBCBridgeHelper.h index bdef65ee250..e3806b85ca9 100644 --- a/src/BridgeHelper/XDBCBridgeHelper.h +++ b/src/BridgeHelper/XDBCBridgeHelper.h @@ -64,7 +64,7 @@ public: ContextPtr context_, Poco::Timespan http_timeout_, const std::string & connection_string_, - bool use_connection_pooling_=true) + bool use_connection_pooling_) : IXDBCBridgeHelper(context_->getGlobalContext()) , log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper")) , connection_string(connection_string_) diff --git a/src/Storages/StorageXDBC.cpp b/src/Storages/StorageXDBC.cpp index 7f073c0e2fe..5dd21d98a7e 100644 --- a/src/Storages/StorageXDBC.cpp +++ b/src/Storages/StorageXDBC.cpp @@ -173,7 +173,8 @@ namespace BridgeHelperPtr bridge_helper = std::make_shared>(args.getContext(), args.getContext()->getSettingsRef().http_receive_timeout.value, - checkAndGetLiteralArgument(engine_args[0], "connection_string")); + checkAndGetLiteralArgument(engine_args[0], "connection_string"), + args.getContext()->getSettingsRef().odbc_bridge_use_connection_pooling.value); return std::make_shared( args.table_id, checkAndGetLiteralArgument(engine_args[1], "database_name"), diff --git a/src/TableFunctions/ITableFunctionXDBC.cpp b/src/TableFunctions/ITableFunctionXDBC.cpp index 3d72d98e7ea..3abda5061df 100644 --- a/src/TableFunctions/ITableFunctionXDBC.cpp +++ b/src/TableFunctions/ITableFunctionXDBC.cpp @@ -56,7 +56,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const { if (!helper) { - helper = createBridgeHelper(context, context->getSettingsRef().http_receive_timeout.value, connection_string); + helper = createBridgeHelper(context, context->getSettingsRef().http_receive_timeout.value, connection_string, context->getSettingsRef().odbc_bridge_use_connection_pooling.value); helper->startBridgeSync(); } } diff --git a/src/TableFunctions/ITableFunctionXDBC.h b/src/TableFunctions/ITableFunctionXDBC.h index 42a3d30a728..984a6a1957f 100644 --- a/src/TableFunctions/ITableFunctionXDBC.h +++ b/src/TableFunctions/ITableFunctionXDBC.h @@ -21,7 +21,8 @@ private: /* A factory method to create bridge helper, that will assist in remote interaction */ virtual BridgeHelperPtr createBridgeHelper(ContextPtr context, Poco::Timespan http_timeout_, - const std::string & connection_string_) const = 0; + const std::string & connection_string_, + bool use_connection_pooling_) const = 0; ColumnsDescription getActualTableStructure(ContextPtr context) const override; @@ -47,9 +48,10 @@ public: private: BridgeHelperPtr createBridgeHelper(ContextPtr context, Poco::Timespan http_timeout_, - const std::string & connection_string_) const override + const std::string & connection_string_, + bool use_connection_pooling_) const override { - return std::make_shared>(context, http_timeout_, connection_string_); + return std::make_shared>(context, http_timeout_, connection_string_, use_connection_pooling_); } const char * getStorageTypeName() const override { return "JDBC"; } @@ -67,9 +69,10 @@ public: private: BridgeHelperPtr createBridgeHelper(ContextPtr context, Poco::Timespan http_timeout_, - const std::string & connection_string_) const override + const std::string & connection_string_, + bool use_connection_pooling_) const override { - return std::make_shared>(context, http_timeout_, connection_string_); + return std::make_shared>(context, http_timeout_, connection_string_, use_connection_pooling_); } const char * getStorageTypeName() const override { return "ODBC"; }