Make common BridgeHelper base class for all bridges

This commit is contained in:
kssenii 2021-03-06 23:22:22 +00:00
parent f83a5d83a2
commit 25fc6a29e5
3 changed files with 263 additions and 123 deletions

View File

@ -0,0 +1,121 @@
#include "BridgeHelper.h"
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
bool BridgeHelper::checkBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(
pingURL(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void BridgeHelper::startBridgeSync() const
{
if (!checkBridgeIsRunning())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (checkBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception("BridgeHelper: " + serviceAlias() + " is not responding",
ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
std::unique_ptr<ShellCommand> BridgeHelper::startBridgeCommand() const
{
if (startBridgeManually())
throw Exception(serviceAlias() + " is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
const auto & config = getConfig();
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
std::vector<std::string> cmd_args;
path.setFileName("clickhouse-odbc-bridge");
#if !CLICKHOUSE_SPLIT_BINARY
cmd_args.push_back("odbc-bridge");
#endif
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", getDefaultPort())));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", getDefaultHost()));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(getHTTPTimeout().totalMicroseconds()));
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
LOG_TRACE(getLog(), "Starting {}", serviceAlias());
return ShellCommand::executeDirect(path.toString(), cmd_args, true);
}
}

View File

@ -0,0 +1,55 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
namespace DB
{
class BridgeHelper
{
public:
//static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
virtual const String serviceAlias() const = 0;
virtual const String configPrefix() const = 0;
virtual const Poco::URI & pingURL() const = 0;
virtual const Context & getContext() const = 0;
virtual Poco::Logger * getLog() const = 0;
virtual const Poco::Util::AbstractConfiguration & getConfig() const = 0;
virtual const String getDefaultHost() const = 0;
virtual size_t getDefaultPort() const = 0;
virtual bool startBridgeManually() const = 0;
virtual void startBridge(std::unique_ptr<ShellCommand> cmd) const = 0;
virtual const Poco::Timespan getHTTPTimeout() const = 0;
void startBridgeSync() const;
virtual ~BridgeHelper() = default;
private:
bool checkBridgeIsRunning() const;
std::unique_ptr<ShellCommand> startBridgeCommand() const;
};
}

View File

@ -15,60 +15,59 @@
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <ext/range.h> #include <ext/range.h>
#include <Common/Bridge/BridgeHelper.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
# include <Common/config.h> # include <Common/config.h>
#endif #endif
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
} }
/** /**
* Class for Helpers for Xdbc-bridges, provide utility methods, not main request * Class for Helpers for Xdbc-bridges, provide utility methods, not main request
*/ */
class IXDBCBridgeHelper class IXDBCBridgeHelper : public BridgeHelper
{ {
public: public:
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0; virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
virtual void startBridgeSync() const = 0;
virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual String getName() const = 0;
virtual ~IXDBCBridgeHelper() = default; virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual const String getName() const = 0;
}; };
using BridgeHelperPtr = std::shared_ptr<IXDBCBridgeHelper>; using BridgeHelperPtr = std::shared_ptr<IXDBCBridgeHelper>;
template <typename BridgeHelperMixin> template <typename BridgeHelperMixin>
class XDBCBridgeHelper : public IXDBCBridgeHelper class XDBCBridgeHelper : public IXDBCBridgeHelper
{ {
private: private:
Poco::Timespan http_timeout; Poco::Timespan http_timeout;
std::string connection_string; std::string connection_string;
Poco::URI ping_url; Poco::URI ping_url;
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"); Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");
std::optional<IdentifierQuotingStyle> quote_style; std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed; std::optional<bool> is_schema_allowed;
protected: protected:
auto getConnectionString() const auto getConnectionString() const { return connection_string; }
{
return connection_string;
}
public: public:
using Configuration = Poco::Util::AbstractConfiguration; using Configuration = Poco::Util::AbstractConfiguration;
@ -83,7 +82,7 @@ public:
static constexpr inline auto COL_INFO_HANDLER = "/columns_info"; static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote"; static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed"; static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_) XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
: http_timeout(http_timeout_), connection_string(connection_string_), context(global_context_), config(context.getConfigRef()) : http_timeout(http_timeout_), connection_string(connection_string_), context(global_context_), config(context.getConfigRef())
@ -97,11 +96,67 @@ public:
ping_url.setPath(PING_HANDLER); ping_url.setPath(PING_HANDLER);
} }
String getName() const override
const String getDefaultHost() const override
{
return DEFAULT_HOST;
}
size_t getDefaultPort() const override
{
return DEFAULT_PORT;
}
const String getName() const override
{ {
return BridgeHelperMixin::getName(); return BridgeHelperMixin::getName();
} }
const String serviceAlias() const override
{
return BridgeHelperMixin::serviceAlias();
}
const String configPrefix() const override
{
return BridgeHelperMixin::configPrefix();
}
const Poco::URI & pingURL() const override
{
return ping_url;
}
const Context & getContext() const override
{
return context;
}
Poco::Logger * getLog() const override
{
return log;
}
const Poco::Util::AbstractConfiguration & getConfig() const override
{
return config;
}
const Poco::Timespan getHTTPTimeout() const override
{
return http_timeout;
}
bool startBridgeManually() const override
{
return BridgeHelperMixin::startBridgeManually();
}
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
{
context.addXDBCBridgeCommand(std::move(cmd));
}
IdentifierQuotingStyle getIdentifierQuotingStyle() override IdentifierQuotingStyle getIdentifierQuotingStyle() override
{ {
if (!quote_style.has_value()) if (!quote_style.has_value())
@ -166,38 +221,6 @@ public:
return result; return result;
} }
/**
* Performs spawn of external daemon
*/
void startBridgeSync() const override
{
if (!checkBridgeIsRunning())
{
LOG_TRACE(log, "{} is not running, will try to start it", BridgeHelperMixin::serviceAlias());
startBridge();
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(log, "Checking {} is running, try {}", BridgeHelperMixin::serviceAlias(), counter);
if (checkBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception(BridgeHelperMixin::getName() + "BridgeHelper: " + BridgeHelperMixin::serviceAlias() + " is not responding",
ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
/** /**
* URI to fetch the data from external service * URI to fetch the data from external service
*/ */
@ -228,52 +251,35 @@ protected:
return uri; return uri;
} }
private:
bool checkBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(
ping_url, Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(context));
return checkString(XDBCBridgeHelper::PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
/* Contains logic for instantiation of the bridge instance */
void startBridge() const
{
auto cmd = BridgeHelperMixin::startBridge(config, log, http_timeout);
context.addXDBCBridgeCommand(std::move(cmd));
}
}; };
struct JDBCBridgeMixin struct JDBCBridgeMixin
{ {
static constexpr inline auto DEFAULT_PORT = 9019; static constexpr inline auto DEFAULT_PORT = 9019;
static const String configPrefix() static const String configPrefix()
{ {
return "jdbc_bridge"; return "jdbc_bridge";
} }
static const String serviceAlias() static const String serviceAlias()
{ {
return "clickhouse-jdbc-bridge"; return "clickhouse-jdbc-bridge";
} }
static const String getName() static const String getName()
{ {
return "JDBC"; return "JDBC";
} }
static AccessType getSourceAccessType() static AccessType getSourceAccessType()
{ {
return AccessType::JDBC; return AccessType::JDBC;
} }
static std::unique_ptr<ShellCommand> startBridge(const Poco::Util::AbstractConfiguration &, const Poco::Logger *, const Poco::Timespan &) static bool startBridgeManually()
{ {
throw Exception("jdbc-bridge is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING); return true;
} }
}; };
@ -285,67 +291,25 @@ struct ODBCBridgeMixin
{ {
return "odbc_bridge"; return "odbc_bridge";
} }
static const String serviceAlias() static const String serviceAlias()
{ {
return "clickhouse-odbc-bridge"; return "clickhouse-odbc-bridge";
} }
static const String getName() static const String getName()
{ {
return "ODBC"; return "ODBC";
} }
static AccessType getSourceAccessType() static AccessType getSourceAccessType()
{ {
return AccessType::ODBC; return AccessType::ODBC;
} }
static std::unique_ptr<ShellCommand> startBridge( static bool startBridgeManually()
const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{ {
/// Path to executable folder return false;
Poco::Path path{config.getString("application.dir", "/usr/bin")};
std::vector<std::string> cmd_args;
path.setFileName("clickhouse-odbc-bridge");
#if !CLICKHOUSE_SPLIT_BINARY
cmd_args.push_back("odbc-bridge");
#endif
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", DEFAULT_PORT)));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", XDBCBridgeHelper<ODBCBridgeMixin>::DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(http_timeout.totalMicroseconds()));
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
LOG_TRACE(log, "Starting {}", serviceAlias());
return ShellCommand::executeDirect(path.toString(), cmd_args, true);
} }
}; };
} }