ClickHouse/programs/odbc-bridge/ODBCConnectionFactory.h

122 lines
3.1 KiB
C++
Raw Normal View History

2021-03-31 12:41:12 +00:00
#pragma once
#include <common/logger_useful.h>
#include <nanodbc/nanodbc.h>
#include <mutex>
2021-04-13 18:52:59 +00:00
#include <common/BorrowedObjectPool.h>
2021-03-31 12:41:12 +00:00
#include <unordered_map>
2021-05-07 10:37:11 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NO_FREE_CONNECTION;
}
}
2021-03-31 12:41:12 +00:00
namespace nanodbc
{
2021-05-07 10:37:11 +00:00
using ConnectionPtr = std::unique_ptr<nanodbc::connection>;
2021-03-31 12:41:12 +00:00
using Pool = BorrowedObjectPool<ConnectionPtr>;
using PoolPtr = std::shared_ptr<Pool>;
class ConnectionHolder
{
public:
2021-05-07 10:37:11 +00:00
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {}
2021-03-31 12:41:12 +00:00
2021-05-07 10:37:11 +00:00
ConnectionHolder(const ConnectionHolder & other) = delete;
2021-03-31 12:41:12 +00:00
2021-06-07 12:15:24 +00:00
~ConnectionHolder()
{
pool->returnObject(std::move(connection));
}
2021-03-31 12:41:12 +00:00
2021-05-07 10:37:11 +00:00
nanodbc::connection & get() const
{
assert(connection != nullptr);
2021-03-31 12:41:12 +00:00
return *connection;
}
private:
PoolPtr pool;
ConnectionPtr connection;
};
2021-05-07 11:18:49 +00:00
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;
2021-03-31 12:41:12 +00:00
}
namespace DB
{
2021-05-07 10:37:11 +00:00
static constexpr inline auto ODBC_CONNECT_TIMEOUT = 100;
static constexpr inline auto ODBC_POOL_WAIT_TIMEOUT = 10000;
2021-06-07 12:15:24 +00:00
static constexpr auto ODBC_CHECK_CONNECTION_QUERY = "SELECT 1";
2021-05-07 10:37:11 +00:00
2021-03-31 12:41:12 +00:00
class ODBCConnectionFactory final : private boost::noncopyable
{
public:
static ODBCConnectionFactory & instance()
{
static ODBCConnectionFactory ret;
return ret;
}
2021-06-07 12:15:24 +00:00
/// this check is performed only on the connection which was already successfully open before.
static bool needReconnect(nanodbc::connection & connection)
{
try
{
/// just_execute - execution without preparing any result object.
just_execute(connection, ODBC_CHECK_CONNECTION_QUERY);
}
catch (const nanodbc::database_error &)
{
return true;
}
return false;
}
2021-05-07 11:18:49 +00:00
nanodbc::ConnectionHolderPtr get(const std::string & connection_string, size_t pool_size)
2021-03-31 12:41:12 +00:00
{
std::lock_guard lock(mutex);
if (!factory.count(connection_string))
factory.emplace(std::make_pair(connection_string, std::make_shared<nanodbc::Pool>(pool_size)));
2021-05-07 10:37:11 +00:00
auto & pool = factory[connection_string];
nanodbc::ConnectionPtr connection;
auto connection_available = pool->tryBorrowObject(connection, []() { return nullptr; }, ODBC_POOL_WAIT_TIMEOUT);
if (!connection_available)
throw Exception("Unable to fetch connection within the timeout", ErrorCodes::NO_FREE_CONNECTION);
try
{
2021-06-07 12:15:24 +00:00
if (!connection || needReconnect(*connection))
{
2021-05-07 10:37:11 +00:00
connection = std::make_unique<nanodbc::connection>(connection_string, ODBC_CONNECT_TIMEOUT);
2021-06-07 12:15:24 +00:00
}
2021-05-07 10:37:11 +00:00
}
catch (...)
{
pool->returnObject(std::move(connection));
2021-06-07 12:15:24 +00:00
throw;
2021-05-07 10:37:11 +00:00
}
2021-05-07 11:18:49 +00:00
return std::make_unique<nanodbc::ConnectionHolder>(factory[connection_string], std::move(connection));
2021-03-31 12:41:12 +00:00
}
private:
2021-04-06 18:59:34 +00:00
/// [connection_settings_string] -> [connection_pool]
2021-03-31 12:41:12 +00:00
using PoolFactory = std::unordered_map<std::string, nanodbc::PoolPtr>;
PoolFactory factory;
std::mutex mutex;
};
}