mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #12165 from vitlibar/fix-split-dictionary-source-table-name
Fix splitting table name of dictionary source
This commit is contained in:
commit
be85f9f98a
@ -8,6 +8,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
|
||||
ODBCBlockOutputStream.cpp
|
||||
ODBCBridge.cpp
|
||||
PingHandler.cpp
|
||||
SchemaAllowedHandler.cpp
|
||||
validateODBCConnectionString.cpp
|
||||
)
|
||||
set (CLICKHOUSE_ODBC_BRIDGE_LINK
|
||||
|
@ -29,6 +29,12 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco:
|
||||
return new IdentifierQuoteHandler(keep_alive_timeout, context);
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/schema_allowed")
|
||||
#if USE_ODBC
|
||||
return new SchemaAllowedHandler(keep_alive_timeout, context);
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/write")
|
||||
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "MainHandler.h"
|
||||
#include "ColumnInfoHandler.h"
|
||||
#include "IdentifierQuoteHandler.h"
|
||||
#include "SchemaAllowedHandler.h"
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
@ -15,7 +16,7 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote' handlers.
|
||||
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
|
||||
* Also stores Session pools for ODBC connections
|
||||
*/
|
||||
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
|
||||
|
76
programs/odbc-bridge/SchemaAllowedHandler.cpp
Normal file
76
programs/odbc-bridge/SchemaAllowedHandler.cpp
Normal file
@ -0,0 +1,76 @@
|
||||
#include "SchemaAllowedHandler.h"
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <IO/WriteBufferFromHTTPServerResponse.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <Poco/Data/ODBC/ODBCException.h>
|
||||
# include <Poco/Data/ODBC/SessionImpl.h>
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
# include <Poco/Net/HTMLForm.h>
|
||||
# include <Poco/Net/HTTPServerRequest.h>
|
||||
# include <Poco/Net/HTTPServerResponse.h>
|
||||
# include <common/logger_useful.h>
|
||||
# include "validateODBCConnectionString.h"
|
||||
|
||||
# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
bool isSchemaAllowed(SQLHDBC hdbc)
|
||||
{
|
||||
std::string identifier;
|
||||
|
||||
SQLSMALLINT t;
|
||||
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_SCHEMA_USAGE, nullptr, 0, &t);
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
|
||||
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
|
||||
|
||||
return t != 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
|
||||
{
|
||||
Poco::Net::HTMLForm params(request, request.stream());
|
||||
LOG_TRACE(log, "Request URI: {}", request.getURI());
|
||||
|
||||
auto process_error = [&response, this](const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
if (!response.sent())
|
||||
response.send() << message << std::endl;
|
||||
LOG_WARNING(log, message);
|
||||
};
|
||||
|
||||
if (!params.has("connection_string"))
|
||||
{
|
||||
process_error("No 'connection_string' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
std::string connection_string = params.get("connection_string");
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
SQLHDBC hdbc = session.dbc().handle();
|
||||
|
||||
bool result = isSchemaAllowed(hdbc);
|
||||
|
||||
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
|
||||
writeBoolText(result, out);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
process_error("Error getting schema usage from ODBC '" + getCurrentExceptionMessage(false) + "'");
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
31
programs/odbc-bridge/SchemaAllowedHandler.h
Normal file
31
programs/odbc-bridge/SchemaAllowedHandler.h
Normal file
@ -0,0 +1,31 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Net/HTTPRequestHandler.h>
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Context;
|
||||
|
||||
|
||||
/// This handler establishes connection to database, and retrieve whether schema is allowed.
|
||||
class SchemaAllowedHandler : public Poco::Net::HTTPRequestHandler
|
||||
{
|
||||
public:
|
||||
SchemaAllowedHandler(size_t keep_alive_timeout_, Context &)
|
||||
: log(&Poco::Logger::get("SchemaAllowedHandler")), keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -41,6 +41,7 @@ public:
|
||||
virtual Poco::URI getMainURI() const = 0;
|
||||
virtual Poco::URI getColumnsInfoURI() const = 0;
|
||||
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
|
||||
virtual bool isSchemaAllowed() = 0;
|
||||
virtual String getName() const = 0;
|
||||
|
||||
virtual ~IXDBCBridgeHelper() = default;
|
||||
@ -61,6 +62,7 @@ private:
|
||||
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");
|
||||
|
||||
std::optional<IdentifierQuotingStyle> quote_style;
|
||||
std::optional<bool> is_schema_allowed;
|
||||
|
||||
protected:
|
||||
auto getConnectionString() const
|
||||
@ -80,6 +82,7 @@ public:
|
||||
static constexpr inline auto MAIN_HANDLER = "/";
|
||||
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
|
||||
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
|
||||
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
|
||||
static constexpr inline auto PING_OK_ANSWER = "Ok.";
|
||||
|
||||
XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
|
||||
@ -128,6 +131,27 @@ public:
|
||||
return *quote_style;
|
||||
}
|
||||
|
||||
bool isSchemaAllowed() override
|
||||
{
|
||||
if (!is_schema_allowed.has_value())
|
||||
{
|
||||
startBridgeSync();
|
||||
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(SCHEMA_ALLOWED_HANDLER);
|
||||
uri.addQueryParameter("connection_string", getConnectionString());
|
||||
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
|
||||
|
||||
bool res;
|
||||
readBoolText(res, buf);
|
||||
is_schema_allowed = res;
|
||||
}
|
||||
|
||||
return *is_schema_allowed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @todo leaky abstraction - used by external API's
|
||||
*/
|
||||
|
@ -102,7 +102,7 @@ CassandraDictionarySource::CassandraDictionarySource(
|
||||
, dict_struct(dict_struct_)
|
||||
, settings(settings_)
|
||||
, sample_block(sample_block_)
|
||||
, query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
|
||||
, query_builder(dict_struct, settings.db, "", settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
|
||||
{
|
||||
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
|
||||
if (settings.port)
|
||||
|
@ -66,7 +66,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
|
||||
, where{config.getString(config_prefix + ".where", "")}
|
||||
, update_field{config.getString(config_prefix + ".update_field", "")}
|
||||
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
|
||||
, sample_block{sample_block_}
|
||||
, context(context_)
|
||||
, is_local{isLocalAddress({host, port}, secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort())}
|
||||
@ -97,7 +97,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
|
||||
, update_field{other.update_field}
|
||||
, invalidate_query{other.invalidate_query}
|
||||
, invalidate_query_response{other.invalidate_query_response}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
|
||||
, sample_block{other.sample_block}
|
||||
, context(other.context)
|
||||
, is_local{other.is_local}
|
||||
|
@ -19,22 +19,12 @@ namespace ErrorCodes
|
||||
ExternalQueryBuilder::ExternalQueryBuilder(
|
||||
const DictionaryStructure & dict_struct_,
|
||||
const std::string & db_,
|
||||
const std::string & schema_,
|
||||
const std::string & table_,
|
||||
const std::string & where_,
|
||||
IdentifierQuotingStyle quoting_style_)
|
||||
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
|
||||
{
|
||||
if (auto pos = table_.find('.'); pos != std::string::npos)
|
||||
{
|
||||
schema = table_.substr(0, pos);
|
||||
table = table_.substr(pos + 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
schema = "";
|
||||
table = table_;
|
||||
}
|
||||
}
|
||||
: dict_struct(dict_struct_), db(db_), schema(schema_), table(table_), where(where_), quoting_style(quoting_style_)
|
||||
{}
|
||||
|
||||
|
||||
void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out) const
|
||||
|
@ -18,8 +18,8 @@ struct ExternalQueryBuilder
|
||||
{
|
||||
const DictionaryStructure & dict_struct;
|
||||
std::string db;
|
||||
std::string table;
|
||||
std::string schema;
|
||||
std::string table;
|
||||
const std::string & where;
|
||||
|
||||
IdentifierQuotingStyle quoting_style;
|
||||
@ -28,6 +28,7 @@ struct ExternalQueryBuilder
|
||||
ExternalQueryBuilder(
|
||||
const DictionaryStructure & dict_struct_,
|
||||
const std::string & db_,
|
||||
const std::string & schema_,
|
||||
const std::string & table_,
|
||||
const std::string & where_,
|
||||
IdentifierQuotingStyle quoting_style_);
|
||||
|
@ -68,7 +68,7 @@ MySQLDictionarySource::MySQLDictionarySource(
|
||||
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
|
||||
, sample_block{sample_block_}
|
||||
, pool{mysqlxx::PoolFactory::instance().get(config, config_prefix)}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
|
||||
, load_all_query{query_builder.composeLoadAllQuery()}
|
||||
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
|
||||
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
|
||||
@ -87,7 +87,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
|
||||
, dont_check_update_time{other.dont_check_update_time}
|
||||
, sample_block{other.sample_block}
|
||||
, pool{other.pool}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
|
||||
, load_all_query{other.load_all_query}
|
||||
, last_modification{other.last_modification}
|
||||
, invalidate_query{other.invalidate_query}
|
||||
|
@ -27,6 +27,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -60,6 +61,39 @@ namespace
|
||||
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
|
||||
BlockInputStreamPtr reader;
|
||||
};
|
||||
|
||||
|
||||
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct_,
|
||||
const std::string & db_,
|
||||
const std::string & schema_,
|
||||
const std::string & table_,
|
||||
const std::string & where_,
|
||||
IXDBCBridgeHelper & bridge_)
|
||||
{
|
||||
std::string schema = schema_;
|
||||
std::string table = table_;
|
||||
|
||||
if (bridge_.isSchemaAllowed())
|
||||
{
|
||||
if (schema.empty())
|
||||
{
|
||||
if (auto pos = table.find('.'); pos != std::string::npos)
|
||||
{
|
||||
schema = table.substr(0, pos);
|
||||
table = table.substr(pos + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!schema.empty())
|
||||
throw Exception{"Dictionary source of type " + bridge_.getName() + " specifies a schema but schema is not supported by "
|
||||
+ bridge_.getName() + "-driver",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
}
|
||||
|
||||
return {dict_struct_, db_, schema, table, where_, bridge_.getIdentifierQuotingStyle()};
|
||||
}
|
||||
}
|
||||
|
||||
static const UInt64 max_block_size = 8192;
|
||||
@ -76,11 +110,12 @@ XDBCDictionarySource::XDBCDictionarySource(
|
||||
, update_time{std::chrono::system_clock::from_time_t(0)}
|
||||
, dict_struct{dict_struct_}
|
||||
, db{config_.getString(config_prefix_ + ".db", "")}
|
||||
, schema{config_.getString(config_prefix_ + ".schema", "")}
|
||||
, table{config_.getString(config_prefix_ + ".table")}
|
||||
, where{config_.getString(config_prefix_ + ".where", "")}
|
||||
, update_field{config_.getString(config_prefix_ + ".update_field", "")}
|
||||
, sample_block{sample_block_}
|
||||
, query_builder{dict_struct, db, table, where, bridge_->getIdentifierQuotingStyle()}
|
||||
, query_builder{makeExternalQueryBuilder(dict_struct, db, schema, table, where, *bridge_)}
|
||||
, load_all_query{query_builder.composeLoadAllQuery()}
|
||||
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
|
||||
, bridge_helper{bridge_}
|
||||
@ -104,7 +139,7 @@ XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
|
||||
, where{other.where}
|
||||
, update_field{other.update_field}
|
||||
, sample_block{other.sample_block}
|
||||
, query_builder{dict_struct, db, table, where, other.bridge_helper->getIdentifierQuotingStyle()}
|
||||
, query_builder{other.query_builder}
|
||||
, load_all_query{other.load_all_query}
|
||||
, invalidate_query{other.invalidate_query}
|
||||
, invalidate_query_response{other.invalidate_query_response}
|
||||
|
@ -69,6 +69,7 @@ private:
|
||||
std::chrono::time_point<std::chrono::system_clock> update_time;
|
||||
const DictionaryStructure dict_struct;
|
||||
const std::string db;
|
||||
const std::string schema;
|
||||
const std::string table;
|
||||
const std::string where;
|
||||
const std::string update_field;
|
||||
|
@ -87,7 +87,6 @@ def test_dependency_via_explicit_table(node):
|
||||
check()
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="TODO: should be fixed")
|
||||
@pytest.mark.parametrize("node", nodes)
|
||||
def test_dependency_via_dictionary_database(node):
|
||||
node.query("CREATE DATABASE dict_db ENGINE=Dictionary")
|
||||
|
Loading…
Reference in New Issue
Block a user