New bridge interaction, quote identifier via api

This commit is contained in:
Alexander Krasheninnikov 2018-09-28 05:46:33 +03:00
parent 4b431264db
commit ceea4b9b33
23 changed files with 288 additions and 767 deletions

View File

@ -2,8 +2,10 @@ add_library (clickhouse-odbc-bridge-lib ${LINK_MODE}
PingHandler.cpp
MainHandler.cpp
ColumnInfoHandler.cpp
IdentifierQuoteHandler.cpp
HandlerFactory.cpp
ODBCBridge.cpp
getIdentifierQuote.cpp
validateODBCConnectionString.cpp
)

View File

@ -1,4 +1,5 @@
#include "ColumnInfoHandler.h"
#include "getIdentifierQuote.h"
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
#if USE_POCO_SQLODBC
@ -113,10 +114,22 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
IAST::FormatSettings settings(ss, true);
settings.always_quote_identifiers = true;
settings.identifier_quoting_style = IdentifierQuotingStyle::DoubleQuotes;
auto identifier_quote = getIdentifierQuote(hdbc);
if (identifier_quote.length() == 0)
settings.identifier_quoting_style = IdentifierQuotingStyle::None;
else if(identifier_quote[0] == '`')
settings.identifier_quoting_style = IdentifierQuotingStyle::Backticks;
else if(identifier_quote[0] == '"')
settings.identifier_quoting_style = IdentifierQuotingStyle::DoubleQuotes;
else
throw Exception("Can not map quote identifier '" + identifier_quote + "' to IdentifierQuotingStyle value");
select->format(settings);
std::string query = ss.str();
std::cout << query << std::endl;
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(query.data()), query.size())))
throw POCO_SQL_ODBC_CLASS::DescriptorException(session.dbc());

View File

@ -25,6 +25,7 @@
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include "validateODBCConnectionString.h"
#include "getIdentifierQuote.h"
namespace DB
{
@ -42,7 +43,7 @@ void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
LOG_WARNING(log, message);
};
if (!params.has("connection_string"))
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
@ -54,18 +55,7 @@ void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
SQLHDBC hdbc = session.dbc().handle();
std::string identifier;
SQLSMALLINT t;
SQLRETURN r = Poco::Data::ODBC::SQLGetInfo(hdbc, SQL_IDENTIFIER_QUOTE_CHAR, NULL, 0, &t);
if (!POCO_SQL_ODBC_CLASS::Utility::isError(r) && t > 0)
{
identifier.resize(static_cast<std::size_t>(t) + 2);
r = Poco::Data::ODBC::SQLGetInfo(hdbc, SQL_IDENTIFIER_QUOTE_CHAR, &identifier[0], SQLSMALLINT((identifier.length() - 1) * sizeof(identifier[0])), &t);
LOG_TRACE(log, "GOT IDENTIFIER: '" + identifier + "'");
}
LOG_TRACE(log, "FINAL IDENTIFIER: '" + identifier + "'");
auto identifier = getIdentifierQuote(hdbc);
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
writeStringBinary(identifier, out);

View File

@ -0,0 +1,45 @@
#include "getIdentifierQuote.h"
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
#if USE_POCO_SQLODBC
#include <Poco/SQL/ODBC/ODBCException.h>
#include <Poco/SQL/ODBC/SessionImpl.h>
#include <Poco/SQL/ODBC/Utility.h>
#define POCO_SQL_ODBC_CLASS Poco::SQL::ODBC
#endif
#if USE_POCO_DATAODBC
#include <Poco/Data/ODBC/ODBCException.h>
#include <Poco/Data/ODBC/SessionImpl.h>
#include <Poco/Data/ODBC/Utility.h>
#define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
#endif
namespace DB
{
std::string getIdentifierQuote(SQLHDBC hdbc)
{
std::string identifier = "";
SQLSMALLINT t;
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_IDENTIFIER_QUOTE_CHAR, NULL, 0, &t);
if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
if (t > 0)
{
// I have not idea, why to add '2' here, got from: contrib/poco/Data/ODBC/src/ODBCStatementImpl.cpp:60 (SQL_DRIVER_NAME)
identifier.resize(static_cast<std::size_t>(t) + 2);
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_IDENTIFIER_QUOTE_CHAR, &identifier[0], SQLSMALLINT((identifier.length() - 1) * sizeof(identifier[0])), &t)))
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
identifier.resize(static_cast<std::size_t>(t));
}
return identifier;
}
}
#endif

View File

@ -0,0 +1,16 @@
#pragma once
#include <Common/config.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <sqltypes.h>
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
namespace DB
{
std::string getIdentifierQuote(SQLHDBC hdbc);
}
#endif

View File

@ -1,151 +0,0 @@
#include <Common/ODBCBridgeHelper.h>
#include <sstream>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
ODBCBridgeHelper::ODBCBridgeHelper(
const Configuration & config_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
: config(config_), http_timeout(http_timeout_), connection_string(connection_string_)
{
size_t bridge_port = config.getUInt("odbc_bridge.port", DEFAULT_PORT);
std::string bridge_host = config.getString("odbc_bridge.host", DEFAULT_HOST);
ping_url.setHost(bridge_host);
ping_url.setPort(bridge_port);
ping_url.setScheme("http");
ping_url.setPath(PING_HANDLER);
}
void ODBCBridgeHelper::startODBCBridge() const
{
Poco::Path path{config.getString("application.dir", "")};
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
"clickhouse-odbc-bridge"
#else
"clickhouse"
#endif
);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary (" + path.toString() + ") is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() <<
#if CLICKHOUSE_SPLIT_BINARY
" "
#else
" odbc-bridge "
#endif
;
command << "--http-port " << config.getUInt("odbc_bridge.port", DEFAULT_PORT) << ' ';
command << "--listen-host " << config.getString("odbc_bridge.listen_host", DEFAULT_HOST) << ' ';
command << "--http-timeout " << http_timeout.totalMicroseconds() << ' ';
if (config.has("logger.odbc_bridge_log"))
command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' ';
if (config.has("logger.odbc_bridge_errlog"))
command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' ';
if (config.has("logger.odbc_bridge_level"))
command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' ';
command << "&"; /// we don't want to wait this process
auto command_str = command.str();
LOG_TRACE(log, "Starting clickhouse-odbc-bridge with command: " << command_str);
auto cmd = ShellCommand::execute(command_str);
cmd->wait();
}
std::vector<std::pair<std::string, std::string>> ODBCBridgeHelper::getURLParams(const std::string & cols, size_t max_block_size) const
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols);
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;
}
bool ODBCBridgeHelper::checkODBCBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(ping_url, Poco::Net::HTTPRequest::HTTP_GET, nullptr);
return checkString(ODBCBridgeHelper::PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void ODBCBridgeHelper::startODBCBridgeSync() const
{
if (!checkODBCBridgeIsRunning())
{
LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it");
startODBCBridge();
bool started = false;
for (size_t counter : ext::range(1, 20))
{
LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter);
if (checkODBCBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (!started)
throw Exception("ODBCBridgeHelper: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
Poco::URI ODBCBridgeHelper::getMainURI() const
{
size_t bridge_port = config.getUInt("odbc_bridge.port", DEFAULT_PORT);
std::string bridge_host = config.getString("odbc_bridge.host", DEFAULT_HOST);
Poco::URI main_uri;
main_uri.setHost(bridge_host);
main_uri.setPort(bridge_port);
main_uri.setScheme("http");
main_uri.setPath(MAIN_HANDLER);
return main_uri;
}
Poco::URI ODBCBridgeHelper::getColumnsInfoURI() const
{
size_t bridge_port = config.getUInt("odbc_bridge.port", DEFAULT_PORT);
std::string bridge_host = config.getString("odbc_bridge.host", DEFAULT_HOST);
Poco::URI columns_info_uri;
columns_info_uri.setHost(bridge_host);
columns_info_uri.setPort(bridge_port);
columns_info_uri.setScheme("http");
columns_info_uri.setPath(COL_INFO_HANDLER);
return columns_info_uri;
}
}

View File

@ -1,52 +0,0 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
}
/** Helper for odbc-bridge, provide utility methods, not main request
*/
class ODBCBridgeHelper
{
private:
using Configuration = Poco::Util::AbstractConfiguration;
const Configuration & config;
Poco::Timespan http_timeout;
std::string connection_string;
Poco::URI ping_url;
Poco::Logger * log = &Poco::Logger::get("ODBCBridgeHelper");
public:
static constexpr inline size_t DEFAULT_PORT = 9018;
static constexpr inline auto DEFAULT_HOST = "localhost";
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
ODBCBridgeHelper(const Configuration & config_, const Poco::Timespan & http_timeout_, const std::string & connection_string_);
std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, size_t max_block_size) const;
bool checkODBCBridgeIsRunning() const;
void startODBCBridge() const;
void startODBCBridgeSync() const;
Poco::URI getMainURI() const;
Poco::URI getColumnsInfoURI() const;
};
}

View File

@ -38,6 +38,7 @@ public:
virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual String getName() const = 0;
virtual ~IXDBCBridgeHelper() {}
};
@ -54,7 +55,7 @@ private:
Poco::URI ping_url;
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::NAME);
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");
std::optional<IdentifierQuotingStyle> quote_style;
@ -93,9 +94,13 @@ public:
virtual ~XDBCBridgeHelper() {}
String getName() const override
{
return BridgeHelperMixin::getName();
}
IdentifierQuotingStyle getIdentifierQuotingStyle() override
{
std::cerr << "GETTING QUOTE STYLE " << std::endl;
if (!quote_style.has_value())
{
auto uri = createBaseURI();
@ -108,14 +113,14 @@ public:
if (character.length() > 1)
throw Exception("Failed to get quoting style from " + BridgeHelperMixin::serviceAlias());
if(character.length() == 0)
if (character.length() == 0)
quote_style = IdentifierQuotingStyle::None;
else if(character[0] == '`')
quote_style = IdentifierQuotingStyle::Backticks;
else if(character[0] == '"')
quote_style = IdentifierQuotingStyle::DoubleQuotes;
else
throw Exception("Failed to determine quoting style from " + BridgeHelperMixin::serviceAlias() + " response: " + character);
throw Exception("Can not map quote identifier '" + character + "' to enum value");
}
return *quote_style;
@ -156,7 +161,7 @@ public:
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (!started)
throw Exception("XDBCBridgeHelper: " + BridgeHelperMixin::serviceAlias() + " is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
throw Exception(BridgeHelperMixin::getName() + "BridgeHelper: " + BridgeHelperMixin::serviceAlias() + " is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
@ -219,13 +224,13 @@ private:
struct JDBCBridgeMixin
{
static constexpr inline auto DEFAULT_PORT = 9019;
static constexpr inline auto NAME = "JDBCBridgeHelper";
static const String configPrefix() { return "jdbc_bridge"; }
static const String serviceAlias() { return "clickhouse-jdbc-bridge"; }
static const String getName() { return "JDBC"; }
static void startBridge(const Poco::Util::AbstractConfiguration & , const Poco::Logger * , const Poco::Timespan & )
{
throw Exception("jdbc-bridge does not support external auto-start");
throw Exception("jdbc-bridge is not running. Please, start it manually");
}
};
@ -233,12 +238,12 @@ struct JDBCBridgeMixin
struct ODBCBridgeMixin {
static constexpr inline auto DEFAULT_PORT = 9018;
static constexpr inline auto NAME = "ODBCBridgeHelper";
static const String configPrefix() { return "odbc_bridge"; }
static const String serviceAlias() { return "clickhouse-odbc-bridge"; }
static const String getName() { return "ODBC"; }
static void startBridge(const Poco::Util::AbstractConfiguration & config, const Poco::Logger * , const Poco::Timespan & http_timeout)
static void startBridge(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
Poco::Path path{config.getString("application.dir", "")};
@ -266,16 +271,19 @@ struct ODBCBridgeMixin {
command << "--http-port " << config.getUInt(configPrefix() + ".port", DEFAULT_PORT) << ' ';
command << "--listen-host " << config.getString(configPrefix() + ".listen_host", XDBCBridgeHelper<ODBCBridgeMixin>::DEFAULT_HOST) << ' ';
command << "--http-timeout " << http_timeout.totalMicroseconds() << ' ';
if (config.has("logger.odbc_bridge_log"))
if (config.has("logger." + configPrefix() +"_log"))
command << "--log-path " << config.getString("logger."+configPrefix()+"_log") << ' ';
if (config.has("logger.odbc_bridge_errlog"))
if (config.has("logger." + configPrefix() + "_errlog"))
command << "--err-log-path " << config.getString("logger." + configPrefix() + "_errlog") << ' ';
if (config.has("logger.odbc_bridge_level"))
if (config.has("logger." + configPrefix() + "_level"))
command << "--log-level " << config.getString("logger." + configPrefix() + "_level") << ' ';
command << "&"; /// we don't want to wait this process
auto command_str = command.str();
// LOG_TRACE(log, "Starting " + serviceAlias() +" with command: " << command_str);
std::cerr << command_str << std::endl;
LOG_TRACE(log, "Starting " + serviceAlias() + " with command: " << command_str);
auto cmd = ShellCommand::execute(command_str);
cmd->wait();

View File

@ -7,10 +7,12 @@
#include <Dictionaries/ExecutableDictionarySource.h>
#include <Dictionaries/HTTPDictionarySource.h>
#include <Dictionaries/LibraryDictionarySource.h>
#include <Dictionaries/XDBCDictionarySource.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/FieldVisitors.h>
#include <Common/XDBCBridgeHelper.h>
#include <Columns/ColumnsNumber.h>
#include <IO/HTTPCommon.h>
#include <memory>
@ -22,7 +24,6 @@
#endif
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
#include <Poco/Data/ODBC/Connector.h>
#include <Dictionaries/ODBCDictionarySource.h>
#endif
#if USE_MYSQL
#include <Dictionaries/MySQLDictionarySource.h>
@ -154,11 +155,19 @@ DictionarySourcePtr DictionarySourceFactory::create(
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
return std::make_unique<ODBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context);
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
else if ("jdbc" == source_type)
{
throw Exception{"Dictionary source of type `jdbc` is disabled until consistend dealing with nullable fields.",
ErrorCodes::SUPPORT_IS_DISABLED};
// BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".connection_string"));
// return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".jdbc", sample_block, context, bridge);
}
else if ("executable" == source_type)
{

View File

@ -1,90 +0,0 @@
#pragma once
#include <Poco/Data/SessionPool.h>
#include <Poco/URI.h>
#include <Common/ODBCBridgeHelper.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/DictionaryStructure.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
class Logger;
}
namespace DB
{
/// Allows loading dictionaries from a ODBC source
class ODBCDictionarySource final : public IDictionarySource
{
public:
ODBCDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block, const Context & context);
/// copy-constructor is provided in order to support cloneability
ODBCDictionarySource(const ODBCDictionarySource & other);
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const std::string & query) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
Block sample_block;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string invalidate_query;
mutable std::string invalidate_query_response;
ODBCBridgeHelper odbc_bridge_helper;
Poco::URI bridge_url;
ConnectionTimeouts timeouts;
const Context & global_context;
};
}

View File

@ -1,4 +1,4 @@
#include <Dictionaries/ODBCDictionarySource.h>
#include <Dictionaries/XDBCDictionarySource.h>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include <Poco/Ext/SessionPoolHelpers.h>
@ -12,6 +12,7 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Formats/FormatFactory.h>
#include <Common/XDBCBridgeHelper.h>
namespace DB
@ -19,18 +20,18 @@ namespace DB
namespace
{
class ODBCBridgeBlockInputStream : public IProfilingBlockInputStream
class XDBCBridgeBlockInputStream : public IProfilingBlockInputStream
{
public:
ODBCBridgeBlockInputStream(const Poco::URI & uri,
XDBCBridgeBlockInputStream(const Poco::URI & uri,
std::function<void(std::ostream &)> callback,
const Block & sample_block,
const Context & context,
size_t max_block_size,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts, const String name) : name(name)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts);
reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
reader = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
}
Block getHeader() const override
@ -40,7 +41,7 @@ namespace
String getName() const override
{
return "ODBCBridgeBlockInputStream";
return name;
}
private:
@ -49,6 +50,7 @@ namespace
return reader->read();
}
String name;
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};
@ -57,10 +59,10 @@ namespace
static const size_t max_block_size = 8192;
ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_struct_,
XDBCDictionarySource::XDBCDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block, const Context & context)
: log(&Logger::get("ODBCDictionarySource")),
const Block & sample_block, const Context & context, const BridgeHelperPtr bridge)
: log(&Logger::get(bridge->getName() + "DictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
db{config.getString(config_prefix + ".db", "")},
@ -68,23 +70,23 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field", "")},
sample_block{sample_block},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None}, /// NOTE Better to obtain quoting style via ODBC interface.
query_builder{dict_struct, db, table, where, bridge->getIdentifierQuotingStyle()},
load_all_query{query_builder.composeLoadAllQuery()},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")},
odbc_bridge_helper{context.getConfigRef(), context.getSettingsRef().http_receive_timeout.value, config.getString(config_prefix + ".connection_string")},
bridge_helper{bridge},
timeouts{ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())},
global_context(context)
{
bridge_url = odbc_bridge_helper.getMainURI();
bridge_url = bridge_helper->getMainURI();
auto url_params = odbc_bridge_helper.getURLParams(sample_block.getNamesAndTypesList().toString(), max_block_size);
auto url_params = bridge_helper->getURLParams(sample_block.getNamesAndTypesList().toString(), max_block_size);
for (const auto & [name, value] : url_params)
bridge_url.addQueryParameter(name, value);
}
/// copy-constructor is provided in order to support cloneability
ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
: log(&Logger::get("ODBCDictionarySource")),
XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
: log(&Logger::get(other.bridge_helper->getName() + "DictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
db{other.db},
@ -92,11 +94,11 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
where{other.where},
update_field{other.update_field},
sample_block{other.sample_block},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None},
query_builder{dict_struct, db, table, where, other.bridge_helper->getIdentifierQuotingStyle()},
load_all_query{other.load_all_query},
invalidate_query{other.invalidate_query},
invalidate_query_response{other.invalidate_query_response},
odbc_bridge_helper{other.odbc_bridge_helper},
bridge_helper{other.bridge_helper},
bridge_url{other.bridge_url},
timeouts{other.timeouts},
global_context{other.global_context}
@ -104,7 +106,7 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
}
std::string ODBCDictionarySource::getUpdateFieldAndDate()
std::string XDBCDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
@ -122,13 +124,13 @@ std::string ODBCDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr ODBCDictionarySource::loadAll()
BlockInputStreamPtr XDBCDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return loadBase(load_all_query);
}
BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll()
BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll()
{
std::string load_query_update = getUpdateFieldAndDate();
@ -136,40 +138,40 @@ BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll()
return loadBase(load_query_update);
}
BlockInputStreamPtr ODBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
BlockInputStreamPtr XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return loadBase(query);
}
BlockInputStreamPtr ODBCDictionarySource::loadKeys(
BlockInputStreamPtr XDBCDictionarySource::loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadBase(query);
}
bool ODBCDictionarySource::supportsSelectiveLoad() const
bool XDBCDictionarySource::supportsSelectiveLoad() const
{
return true;
}
bool ODBCDictionarySource::hasUpdateField() const
bool XDBCDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr ODBCDictionarySource::clone() const
DictionarySourcePtr XDBCDictionarySource::clone() const
{
return std::make_unique<ODBCDictionarySource>(*this);
return std::make_unique<XDBCDictionarySource>(*this);
}
std::string ODBCDictionarySource::toString() const
std::string XDBCDictionarySource::toString() const
{
return "ODBC: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
return bridge_helper->getName() + ": " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
bool ODBCDictionarySource::isModified() const
bool XDBCDictionarySource::isModified() const
{
if (!invalidate_query.empty())
{
@ -182,39 +184,39 @@ bool ODBCDictionarySource::isModified() const
}
std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) const
std::string XDBCDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
odbc_bridge_helper.startODBCBridgeSync();
bridge_helper->startBridgeSync();
auto invalidate_url = odbc_bridge_helper.getMainURI();
auto url_params = odbc_bridge_helper.getURLParams(invalidate_sample_block.getNamesAndTypesList().toString(), max_block_size);
auto invalidate_url = bridge_helper->getMainURI();
auto url_params = bridge_helper->getURLParams(invalidate_sample_block.getNamesAndTypesList().toString(), max_block_size);
for (const auto & [name, value] : url_params)
invalidate_url.addQueryParameter(name, value);
ODBCBridgeBlockInputStream stream(
XDBCBridgeBlockInputStream stream(
invalidate_url,
[request](std::ostream & os) { os << "query=" << request; },
invalidate_sample_block,
global_context,
max_block_size,
timeouts);
timeouts, bridge_helper->getName() + "BlockInputStream");
return readInvalidateQuery(stream);
}
BlockInputStreamPtr ODBCDictionarySource::loadBase(const std::string & query) const
BlockInputStreamPtr XDBCDictionarySource::loadBase(const std::string & query) const
{
odbc_bridge_helper.startODBCBridgeSync();
return std::make_shared<ODBCBridgeBlockInputStream>(bridge_url,
bridge_helper->startBridgeSync();
return std::make_shared<XDBCBridgeBlockInputStream>(bridge_url,
[query](std::ostream & os) { os << "query=" << query; },
sample_block,
global_context,
max_block_size,
timeouts);
timeouts, bridge_helper->getName() + "BlockInputStream");
}
}

View File

@ -0,0 +1,88 @@
#pragma once
#include <Poco/Data/SessionPool.h>
#include <Poco/URI.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Common/XDBCBridgeHelper.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
class Logger;
}
namespace DB
{
/// Allows loading dictionaries from a XDBC source via bridges
class XDBCDictionarySource final : public IDictionarySource
{
public:
XDBCDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const Block & sample_block, const Context & context, BridgeHelperPtr bridge);
/// copy-constructor is provided in order to support cloneability
XDBCDictionarySource(const XDBCDictionarySource & other);
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const std::string & query) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
Block sample_block;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string invalidate_query;
mutable std::string invalidate_query_response;
BridgeHelperPtr bridge_helper;
Poco::URI bridge_url;
ConnectionTimeouts timeouts;
const Context & global_context;
};
}

View File

@ -6,7 +6,7 @@ class IProfilingBlockInputStream;
namespace DB
{
// Using in MySQLDictionarySource and ODBCDictionarySource after processing invalidate_query
// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query
std::string readInvalidateQuery(IProfilingBlockInputStream & block_input_stream);

View File

@ -1,115 +0,0 @@
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageODBC.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Common/ShellCommand.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
StorageODBC::StorageODBC(const std::string & table_name_,
const std::string & connection_string,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const ColumnsDescription & columns_,
const Context & context_)
: IStorageURLBase(Poco::URI(), context_, table_name_, ODBCBridgeHelper::DEFAULT_FORMAT, columns_)
, odbc_bridge_helper(context_global.getConfigRef(), context_global.getSettingsRef().http_receive_timeout.value, connection_string)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
, log(&Poco::Logger::get("StorageODBC"))
{
uri = odbc_bridge_helper.getMainURI();
}
std::string StorageODBC::getReadMethod() const
{
return Poco::Net::HTTPRequest::HTTP_POST;
}
std::vector<std::pair<std::string, std::string>> StorageODBC::getReadURIParams(const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t max_block_size) const
{
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = getColumn(name);
cols.emplace_back(column_data.name, column_data.type);
}
return odbc_bridge_helper.getURLParams(cols.toString(), max_block_size);
}
std::function<void(std::ostream &)> StorageODBC::getReadPOSTDataCallback(const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
String query = transformQueryForExternalDatabase(
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::DoubleQuotes, remote_database_name, remote_table_name, context);
return [query](std::ostream & os) { os << "query=" << query; };
}
BlockInputStreams StorageODBC::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
odbc_bridge_helper.startODBCBridgeSync();
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
Block StorageODBC::getHeaderBlock(const Names & column_names) const
{
return getSampleBlockForColumns(column_names);
}
void registerStorageODBC(StorageFactory & factory)
{
factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(
"Storage ODBC requires exactly 3 parameters: ODBC('DSN', database or schema, table)", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
return StorageODBC::create(args.table_name,
static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>(),
args.columns,
args.context);
});
}
}

View File

@ -1,61 +0,0 @@
#pragma once
#include <Storages/StorageURL.h>
#include <Common/ODBCBridgeHelper.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/** Implements storage in the ODBC database.
* Use ENGINE = odbc(connection_string, table_name)
* Example ENGINE = odbc('dsn=test', table)
* Read only.
*/
class StorageODBC : public ext::shared_ptr_helper<StorageODBC>, public IStorageURLBase
{
public:
std::string getName() const override
{
return "ODBC";
}
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
protected:
StorageODBC(const std::string & table_name_,
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_);
private:
ODBCBridgeHelper odbc_bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
Poco::Logger * log;
std::string getReadMethod() const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names) const override;
};
}

View File

@ -1,4 +1,3 @@
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
@ -20,8 +19,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
@ -36,8 +33,7 @@ namespace DB
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
{
// @todo give it appropriate name
log = &Poco::Logger::get("StorageXDBC");
log = &Poco::Logger::get("Storage" + bridge_helper->getName());
uri = bridge_helper->getMainURI();
}
@ -92,62 +88,53 @@ namespace DB
return getSampleBlockForColumns(column_names);
}
void registerStorageJDBC(StorageFactory & factory)
std::string StorageXDBC::getName() const
{
factory.registerStorage("JDBC", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(
"Storage JDBC requires exactly 3 parameters: JDBC('DSN', database or schema, table)", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(
args.context.getConfigRef(),
args.context.getSettingsRef().http_receive_timeout.value,
static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>()
);
return std::make_shared<StorageJDBC>(args.table_name,
static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>(),
args.columns,
args.context,
bridge_helper
);
});
return bridge_helper->getName();
}
void registerStorageIDBC(StorageFactory & factory)
namespace
{
factory.registerStorage("IDBC", [](const StorageFactory::Arguments & args)
template <typename BridgeHelperMixin>
void registerXDBCStorage(StorageFactory & factory, const std::string & name)
{
ASTs & engine_args = args.engine_args;
factory.registerStorage(name, [&name](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(
"Storage IDBC requires exactly 3 parameters: IDBC('DSN', database or schema, table)", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (engine_args.size() != 3)
throw Exception(
"Storage " + name + " requires exactly 3 parameters: " + name + "('DSN', database or schema, table)", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(
args.context.getConfigRef(),
args.context.getSettingsRef().http_receive_timeout.value,
static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>()
);
return std::make_shared<StorageIDBC>(args.table_name,
static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>(),
args.columns,
args.context,
bridge_helper
);
BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(
args.context.getConfigRef(),
args.context.getSettingsRef().http_receive_timeout.value,
static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>()
);
return std::make_shared<StorageXDBC>(args.table_name,
static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>(),
args.columns,
args.context,
bridge_helper
);
});
});
}
}
void registerStorageJDBC(StorageFactory & factory)
{
registerXDBCStorage<JDBCBridgeMixin>(factory, "JDBC");
}
void registerStorageODBC(StorageFactory & factory)
{
registerXDBCStorage<ODBCBridgeMixin>(factory, "ODBC");
}
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/StorageURL.h>
#include <Common/ODBCBridgeHelper.h>
#include <ext/shared_ptr_helper.h>
#include <Common/XDBCBridgeHelper.h>
@ -53,38 +52,7 @@ namespace DB
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names) const override;
};
// Implementations for StorageXDBC
class StorageJDBC : public StorageXDBC
{
public:
StorageJDBC(const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_)
: StorageXDBC(table_name_, remote_database_name, remote_table_name, columns_, context_, bridge_helper_) {}
std::string getName() const override
{
return "JDBC";
}
};
class StorageIDBC : public StorageXDBC
{
public:
StorageIDBC(const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_)
: StorageXDBC(table_name_, remote_database_name, remote_table_name, columns_, context_, bridge_helper_) {}
std::string getName() const override
{
return "IDBC";
}
std::string getName() const override;
};
}

View File

@ -26,7 +26,6 @@ void registerStorageMaterializedView(StorageFactory & factory);
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
void registerStorageODBC(StorageFactory & factory);
void registerStorageIDBC(StorageFactory & factory);
#endif
void registerStorageJDBC(StorageFactory & factory);
@ -63,7 +62,6 @@ void registerStorages()
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
registerStorageODBC(factory);
registerStorageIDBC(factory);
#endif
registerStorageJDBC(factory);

View File

@ -1,4 +1,3 @@
#include <TableFunctions/TableFunctionODBC.h>
#include <type_traits>
#include <ext/scope_guard.h>
@ -11,13 +10,12 @@
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <Poco/Net/HTTPRequest.h>
#include <Storages/StorageODBC.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/ITableFunctionXDBC.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/StorageXDBC.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Common/ODBCBridgeHelper.h>
#include <Core/Defines.h>
@ -75,7 +73,7 @@ namespace DB
readStringBinary(columns_info, buf);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);
auto result = createStorage(table_name, schema_name, table_name, ColumnsDescription{columns}, context, helper);
auto result = std::make_shared<StorageXDBC>(table_name, schema_name, table_name, ColumnsDescription{columns}, context, helper);
if(!result)
throw Exception("Failed to instantiate storage from table function " + getName());
@ -89,8 +87,8 @@ namespace DB
factory.registerFunction<TableFunctionJDBC>();
}
void registerTableFunctionIDBC(TableFunctionFactory & factory)
void registerTableFunctionODBC(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionIDBC>();
factory.registerFunction<TableFunctionODBC>();
}
}

View File

@ -17,14 +17,6 @@ class ITableFunctionXDBC : public ITableFunction
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
/* A factory method for creating the storage implementation */
virtual StoragePtr createStorage(const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_,
BridgeHelperPtr bridge_helper) const = 0;
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(const Poco::Util::AbstractConfiguration & config_,
const Poco::Timespan & http_timeout_,
@ -40,14 +32,6 @@ public:
return name;
}
private:
StoragePtr createStorage(const std::string &table_name_, const std::string &remote_database_name,
const std::string &remote_table_name,
const ColumnsDescription &columns_,
const Context &context_, BridgeHelperPtr bridge_helper) const override {
return std::make_shared<StorageJDBC>(table_name_, remote_database_name, remote_table_name, columns_, context_, bridge_helper);
}
BridgeHelperPtr createBridgeHelper(const Poco::Util::AbstractConfiguration & config_,
const Poco::Timespan & http_timeout_,
@ -56,23 +40,15 @@ private:
}
};
class TableFunctionIDBC : public ITableFunctionXDBC
class TableFunctionODBC : public ITableFunctionXDBC
{
public:
static constexpr auto name = "idbc";
static constexpr auto name = "odbc";
std::string getName() const override
{
return name;
}
private:
StoragePtr createStorage(const std::string &table_name_, const std::string &remote_database_name,
const std::string &remote_table_name,
const ColumnsDescription &columns_,
const Context &context_, BridgeHelperPtr bridge_helper) const override {
return std::make_shared<StorageIDBC>(table_name_, remote_database_name, remote_table_name, columns_, context_, bridge_helper);
}
BridgeHelperPtr createBridgeHelper(const Poco::Util::AbstractConfiguration & config_,
const Poco::Timespan & http_timeout_,

View File

@ -1,86 +0,0 @@
#include <TableFunctions/TableFunctionODBC.h>
#include <type_traits>
#include <ext/scope_guard.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <Poco/Net/HTTPRequest.h>
#include <Storages/StorageODBC.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Common/ODBCBridgeHelper.h>
#include <Core/Defines.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionODBC::executeImpl(const ASTPtr & ast_function, const Context & context) const
{
const ASTFunction & args_func = typeid_cast<const ASTFunction &>(*ast_function);
if (!args_func.arguments)
throw Exception("Table function 'odbc' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.arguments).children;
if (args.size() != 2 && args.size() != 3)
throw Exception("Table function 'odbc' requires 2 or 3 arguments: ODBC('DSN', table) or ODBC('DSN', schema, table)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto i = 0u; i < args.size(); ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string connection_string = "";
std::string schema_name = "";
std::string table_name = "";
if (args.size() == 3)
{
connection_string = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
schema_name = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
table_name = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
} else if (args.size() == 2)
{
connection_string = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
table_name = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
}
const auto & config = context.getConfigRef();
ODBCBridgeHelper helper(config, context.getSettingsRef().http_receive_timeout.value, connection_string);
helper.startODBCBridgeSync();
Poco::URI columns_info_uri = helper.getColumnsInfoURI();
columns_info_uri.addQueryParameter("connection_string", connection_string);
if (!schema_name.empty())
columns_info_uri.addQueryParameter("schema", schema_name);
columns_info_uri.addQueryParameter("table", table_name);
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, nullptr);
std::string columns_info;
readStringBinary(columns_info, buf);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);
auto result = StorageODBC::create(table_name, connection_string, schema_name, table_name, ColumnsDescription{columns}, context);
result->startup();
return result;
}
void registerTableFunctionODBC(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionODBC>();
}
}

View File

@ -1,22 +0,0 @@
#pragma once
#include <Common/config.h>
#include <TableFunctions/ITableFunction.h>
namespace DB
{
/* odbc (odbc connect string, table) - creates a temporary StorageODBC.
*/
class TableFunctionODBC : public ITableFunction
{
public:
static constexpr auto name = "odbc";
std::string getName() const override
{
return name;
}
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
};
}

View File

@ -16,7 +16,6 @@ void registerTableFunctionURL(TableFunctionFactory & factory);
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
void registerTableFunctionODBC(TableFunctionFactory & factory);
void registerTableFunctionIDBC(TableFunctionFactory & factory);
#endif
void registerTableFunctionJDBC(TableFunctionFactory & factory);
@ -40,7 +39,6 @@ void registerTableFunctions()
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
registerTableFunctionODBC(factory);
registerTableFunctionIDBC(factory);
#endif
registerTableFunctionJDBC(factory);