This commit is contained in:
kssenii 2021-04-17 08:09:22 +00:00
parent 3f3f928c1f
commit 5dc2dfa437
6 changed files with 110 additions and 65 deletions

View File

@ -54,9 +54,10 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: {}", request.getURI());
if (mode == "read")
params.read(request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
if (mode == "read" && !params.has("query"))
{
@ -64,11 +65,6 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
return;
}
if (!params.has("columns"))
{
processError(response, "No 'columns' in request URL");
return;
}
if (!params.has("connection_string"))
{
@ -76,6 +72,16 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
return;
}
if (!params.has("sample_block"))
{
processError(response, "No 'sample_block' in request URL");
return;
}
std::string format = params.get("format", "RowBinary");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
@ -88,24 +94,19 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
max_block_size = parse<size_t>(max_block_size_str);
}
std::string columns = params.get("columns");
std::string sample_block_string = params.get("sample_block");
std::unique_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
sample_block = parseColumns(std::move(sample_block_string));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
processError(response, "Invalid 'sample_block' parameter in request body '" + ex.message() + "'");
LOG_ERROR(log, ex.getStackTraceString());
return;
}
std::string format = params.get("format", "RowBinary");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -37,7 +37,7 @@ class IXDBCBridgeHelper : public IBridgeHelper
public:
explicit IXDBCBridgeHelper(ContextPtr context_) : IBridgeHelper(context_) {}
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
virtual std::vector<std::pair<std::string, std::string>> getURLParams(UInt64 max_block_size) const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
@ -138,12 +138,11 @@ protected:
return uri;
}
URLParams getURLParams(const std::string & cols, UInt64 max_block_size) const override
URLParams getURLParams(UInt64 max_block_size) const override
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols);
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;

View File

@ -16,12 +16,9 @@
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "readInvalidateQuery.h"
#include "registerDictionaries.h"
#include <Common/escapeForFileName.h>
#if USE_ODBC
# include <Poco/Data/ODBC/Connector.h> // Y_IGNORE
#endif
namespace DB
{
@ -125,7 +122,7 @@ XDBCDictionarySource::XDBCDictionarySource(
{
bridge_url = bridge_helper->getMainURI();
auto url_params = bridge_helper->getURLParams(sample_block_.getNamesAndTypesList().toString(), max_block_size);
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)
bridge_url.addQueryParameter(name, value);
}
@ -151,6 +148,7 @@ XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
{
}
std::string XDBCDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
@ -167,52 +165,61 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr XDBCDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return loadBase(load_all_query);
return loadFromQuery(bridge_url, sample_block, load_all_query);
}
BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll()
{
std::string load_query_update = getUpdateFieldAndDate();
LOG_TRACE(log, load_query_update);
return loadBase(load_query_update);
return loadFromQuery(bridge_url, sample_block, load_query_update);
}
BlockInputStreamPtr XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return loadBase(query);
return loadFromQuery(bridge_url, sample_block, query);
}
BlockInputStreamPtr XDBCDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadBase(query);
return loadFromQuery(bridge_url, sample_block, query);
}
bool XDBCDictionarySource::supportsSelectiveLoad() const
{
return true;
}
bool XDBCDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr XDBCDictionarySource::clone() const
{
return std::make_unique<XDBCDictionarySource>(*this);
}
std::string XDBCDictionarySource::toString() const
{
return bridge_helper->getName() + ": " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
bool XDBCDictionarySource::isModified() const
{
if (!invalidate_query.empty())
@ -235,41 +242,38 @@ std::string XDBCDictionarySource::doInvalidateQuery(const std::string & request)
bridge_helper->startBridgeSync();
auto invalidate_url = bridge_helper->getMainURI();
auto url_params = bridge_helper->getURLParams(invalidate_sample_block.getNamesAndTypesList().toString(), max_block_size);
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)
invalidate_url.addQueryParameter(name, value);
XDBCBridgeBlockInputStream stream(
invalidate_url,
[request](std::ostream & os) { os << "query=" << request; },
invalidate_sample_block,
getContext(),
max_block_size,
timeouts,
bridge_helper->getName() + "BlockInputStream");
return readInvalidateQuery(stream);
return readInvalidateQuery(*loadFromQuery(invalidate_url, invalidate_sample_block, request));
}
BlockInputStreamPtr XDBCDictionarySource::loadBase(const std::string & query) const
BlockInputStreamPtr XDBCDictionarySource::loadFromQuery(const Poco::URI url, const Block & required_sample_block, const std::string & query) const
{
bridge_helper->startBridgeSync();
auto write_body_callback = [required_sample_block, query](std::ostream & os)
{
os << "sample_block=" << escapeForFileName(required_sample_block.getNamesAndTypesList().toString());
os << "&";
os << "query=" << escapeForFileName(query);
};
return std::make_shared<XDBCBridgeBlockInputStream>(
bridge_url,
[query](std::ostream & os) { os << "query=" << query; },
sample_block,
url,
write_body_callback,
required_sample_block,
getContext(),
max_block_size,
timeouts,
bridge_helper->getName() + "BlockInputStream");
}
void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
{
#if USE_ODBC
Poco::Data::ODBC::Connector::registerConnector();
#endif
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
@ -294,6 +298,7 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
factory.registerSource("odbc", create_table_source);
}
void registerDictionarySourceJDBC(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & /* dict_struct */,

View File

@ -62,7 +62,7 @@ private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const std::string & query) const;
BlockInputStreamPtr loadFromQuery(const Poco::URI url, const Block & required_sample_block, const std::string & query) const;
Poco::Logger * log;

View File

@ -14,6 +14,8 @@
#include <Storages/StorageURL.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <common/logger_useful.h>
#include <Common/escapeForFileName.h>
namespace DB
{
@ -53,24 +55,18 @@ std::string StorageXDBC::getReadMethod() const
}
std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const Names & /* column_names */,
const StorageMetadataPtr & /* metadata_snapshot */,
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t max_block_size) const
{
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
return bridge_helper->getURLParams(cols.toString(), max_block_size);
return bridge_helper->getURLParams(max_block_size);
}
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
const Names & /*column_names*/,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr local_context,
@ -84,7 +80,21 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
remote_table_name,
local_context);
return [query](std::ostream & os) { os << "query=" << query; };
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto write_body_callback = [query, cols](std::ostream & os)
{
os << "sample_block=" << escapeForFileName(cols.toString());
os << "&";
os << "query=" << escapeForFileName(query);
};
return write_body_callback;
}
Pipe StorageXDBC::read(
@ -106,20 +116,17 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
{
bridge_helper->startBridgeSync();
NamesAndTypesList cols;
Poco::URI request_uri = uri;
request_uri.setPath("/write");
for (const String & name : metadata_snapshot->getSampleBlock().getNames())
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto url_params = bridge_helper->getURLParams(cols.toString(), 65536);
auto url_params = bridge_helper->getURLParams(65536);
for (const auto & [param, value] : url_params)
request_uri.addQueryParameter(param, value);
request_uri.addQueryParameter("db_name", remote_database_name);
request_uri.addQueryParameter("table_name", remote_table_name);
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLBlockOutputStream>(
request_uri,

View File

@ -505,3 +505,36 @@ def test_concurrent_queries(started_cluster):
node1.query('DROP TABLE test_pg_table;')
cursor.execute('DROP TABLE clickhouse.test_pg_table;')
def test_odbc_long_column_names(started_cluster):
conn = get_postgres_conn();
cursor = conn.cursor()
column_name = "column" * 8
create_table = "CREATE TABLE clickhouse.test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} integer".format(column_name + str(i))
create_table += ")"
cursor.execute(create_table)
insert = "INSERT INTO clickhouse.test_long_column_names SELECT i" + ", i" * 999 + " FROM generate_series(0, 99) as t(i)"
cursor.execute(insert)
conn.commit()
create_table = "CREATE TABLE test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} UInt32".format(column_name + str(i))
create_table += ") ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_long_column_names')"
result = node1.query(create_table);
result = node1.query('SELECT * FROM test_long_column_names');
expected = node1.query("SELECT number" + ", number" * 999 + " FROM numbers(100)")
assert(result == expected)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_long_column_names")
node1.query("DROP TABLE IF EXISTS test_long_column_names")