mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
commit
cc4ddfd89e
@ -5,6 +5,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
|
||||
IdentifierQuoteHandler.cpp
|
||||
MainHandler.cpp
|
||||
ODBCBlockInputStream.cpp
|
||||
ODBCBlockOutputStream.cpp
|
||||
ODBCBridge.cpp
|
||||
PingHandler.cpp
|
||||
validateODBCConnectionString.cpp
|
||||
|
@ -16,6 +16,7 @@
|
||||
# include <Poco/Net/HTTPServerResponse.h>
|
||||
# include <Poco/NumberParser.h>
|
||||
# include <common/logger_useful.h>
|
||||
# include <Common/quoteString.h>
|
||||
# include <ext/scope_guard.h>
|
||||
# include "getIdentifierQuote.h"
|
||||
# include "validateODBCConnectionString.h"
|
||||
@ -58,11 +59,6 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
|
||||
{
|
||||
Poco::Net::HTMLForm params(request, request.stream());
|
||||
@ -116,7 +112,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
|
||||
const auto & context_settings = context.getSettingsRef();
|
||||
|
||||
/// TODO Why not do SQLColumns instead?
|
||||
std::string name = schema_name.empty() ? table_name : schema_name + "." + table_name;
|
||||
std::string name = schema_name.empty() ? backQuoteIfNeed(table_name) : backQuoteIfNeed(schema_name) + "." + backQuoteIfNeed(table_name);
|
||||
std::stringstream ss;
|
||||
std::string input = "SELECT * FROM " + name + " WHERE 1 = 0";
|
||||
ParserQueryWithOutput parser;
|
||||
@ -124,17 +120,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
|
||||
|
||||
IAST::FormatSettings settings(ss, true);
|
||||
settings.always_quote_identifiers = true;
|
||||
|
||||
auto identifier_quote = getIdentifierQuote(hdbc);
|
||||
if (identifier_quote.length() == 0)
|
||||
settings.identifier_quoting_style = IdentifierQuotingStyle::None;
|
||||
else if (identifier_quote[0] == '`')
|
||||
settings.identifier_quoting_style = IdentifierQuotingStyle::Backticks;
|
||||
else if (identifier_quote[0] == '"')
|
||||
settings.identifier_quoting_style = IdentifierQuotingStyle::DoubleQuotes;
|
||||
else
|
||||
throw Exception("Can not map quote identifier '" + identifier_quote + "' to IdentifierQuotingStyle value", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
settings.identifier_quoting_style = getQuotingStyle(hdbc);
|
||||
select->format(settings);
|
||||
std::string query = ss.str();
|
||||
|
||||
|
@ -30,8 +30,10 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco:
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/write")
|
||||
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
|
||||
else
|
||||
return new ODBCHandler(pool_map, keep_alive_timeout, context);
|
||||
return new ODBCHandler(pool_map, keep_alive_timeout, context, "read");
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -5,17 +5,25 @@
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include "ODBCBlockInputStream.h"
|
||||
#include "ODBCBlockOutputStream.h"
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/WriteBufferFromHTTPServerResponse.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <Poco/Net/HTMLForm.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <mutex>
|
||||
#include <Poco/ThreadPool.h>
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include "getIdentifierQuote.h"
|
||||
|
||||
#if USE_ODBC
|
||||
#include <Poco/Data/ODBC/SessionImpl.h>
|
||||
#define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -63,34 +71,36 @@ ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
|
||||
return pool_map->at(connection_str);
|
||||
}
|
||||
|
||||
void ODBCHandler::processError(Poco::Net::HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
if (!response.sent())
|
||||
response.send() << message << std::endl;
|
||||
LOG_WARNING(log, message);
|
||||
}
|
||||
|
||||
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
|
||||
{
|
||||
Poco::Net::HTMLForm params(request, request.stream());
|
||||
Poco::Net::HTMLForm params(request);
|
||||
if (mode == "read")
|
||||
params.read(request.stream());
|
||||
LOG_TRACE(log, "Request URI: " + request.getURI());
|
||||
|
||||
auto process_error = [&response, this](const std::string & message)
|
||||
if (mode == "read" && !params.has("query"))
|
||||
{
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
if (!response.sent())
|
||||
response.send() << message << std::endl;
|
||||
LOG_WARNING(log, message);
|
||||
};
|
||||
|
||||
if (!params.has("query"))
|
||||
{
|
||||
process_error("No 'query' in request body");
|
||||
processError(response, "No 'query' in request body");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!params.has("columns"))
|
||||
{
|
||||
process_error("No 'columns' in request URL");
|
||||
processError(response, "No 'columns' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!params.has("connection_string"))
|
||||
{
|
||||
process_error("No 'connection_string' in request URL");
|
||||
processError(response, "No 'connection_string' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -100,7 +110,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
|
||||
std::string max_block_size_str = params.get("max_block_size", "");
|
||||
if (max_block_size_str.empty())
|
||||
{
|
||||
process_error("Empty max_block_size specified");
|
||||
processError(response, "Empty max_block_size specified");
|
||||
return;
|
||||
}
|
||||
max_block_size = parse<size_t>(max_block_size_str);
|
||||
@ -114,33 +124,70 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'");
|
||||
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
|
||||
LOG_WARNING(log, ex.getStackTraceString());
|
||||
return;
|
||||
}
|
||||
|
||||
std::string format = params.get("format", "RowBinary");
|
||||
std::string query = params.get("query");
|
||||
LOG_TRACE(log, "Query: " << query);
|
||||
|
||||
std::string connection_string = params.get("connection_string");
|
||||
LOG_TRACE(log, "Connection string: '" << connection_string << "'");
|
||||
|
||||
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
|
||||
|
||||
try
|
||||
{
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, context);
|
||||
auto pool = getPool(connection_string);
|
||||
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
|
||||
copyData(inp, *writer);
|
||||
if (mode == "write")
|
||||
{
|
||||
if (!params.has("db_name"))
|
||||
{
|
||||
processError(response, "No 'db_name' in request URL");
|
||||
return;
|
||||
}
|
||||
if (!params.has("table_name"))
|
||||
{
|
||||
processError(response, "No 'table_name' in request URL");
|
||||
return;
|
||||
}
|
||||
std::string db_name = params.get("db_name");
|
||||
std::string table_name = params.get("table_name");
|
||||
LOG_TRACE(log, "DB name: '" << db_name << "', table name: '" << table_name << "'");
|
||||
|
||||
auto quoting_style = IdentifierQuotingStyle::None;
|
||||
#if USE_ODBC
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
quoting_style = getQuotingStyle(session.dbc().handle());
|
||||
#endif
|
||||
|
||||
auto pool = getPool(connection_string);
|
||||
ReadBufferFromIStream read_buf(request.stream());
|
||||
BlockInputStreamPtr input_stream = FormatFactory::instance().getInput(format, read_buf, *sample_block,
|
||||
context, max_block_size);
|
||||
ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block, quoting_style);
|
||||
copyData(*input_stream, output_stream);
|
||||
writeStringBinary("Ok.", out);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string query = params.get("query");
|
||||
LOG_TRACE(log, "Query: " << query);
|
||||
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, context);
|
||||
auto pool = getPool(connection_string);
|
||||
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
|
||||
copyData(inp, *writer);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto message = getCurrentExceptionMessage(true);
|
||||
response.setStatusAndReason(
|
||||
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
|
||||
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
|
||||
writeStringBinary(message, out);
|
||||
tryLogCurrentException(log);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,11 +24,13 @@ public:
|
||||
|
||||
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
|
||||
size_t keep_alive_timeout_,
|
||||
Context & context_)
|
||||
Context & context_,
|
||||
const String & mode_)
|
||||
: log(&Poco::Logger::get("ODBCHandler"))
|
||||
, pool_map(pool_map_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
, mode(mode_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -40,10 +42,12 @@ private:
|
||||
std::shared_ptr<PoolMap> pool_map;
|
||||
size_t keep_alive_timeout;
|
||||
Context & context;
|
||||
String mode;
|
||||
|
||||
static inline std::mutex mutex;
|
||||
|
||||
PoolPtr getPool(const std::string & connection_str);
|
||||
void processError(Poco::Net::HTTPServerResponse & response, const std::string & message);
|
||||
};
|
||||
|
||||
}
|
||||
|
133
programs/odbc-bridge/ODBCBlockOutputStream.cpp
Normal file
133
programs/odbc-bridge/ODBCBlockOutputStream.cpp
Normal file
@ -0,0 +1,133 @@
|
||||
#include "ODBCBlockOutputStream.h"
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Core/Field.h>
|
||||
#include <common/LocalDate.h>
|
||||
#include <common/LocalDateTime.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include "getIdentifierQuote.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
||||
std::string getInsertQuery(const std::string & db_name, const std::string & table_name, const ColumnsWithTypeAndName & columns, IdentifierQuotingStyle quoting)
|
||||
{
|
||||
ASTInsertQuery query;
|
||||
query.table_id.database_name = db_name;
|
||||
query.table_id.table_name = table_name;
|
||||
query.columns = std::make_shared<ASTExpressionList>(',');
|
||||
query.children.push_back(query.columns);
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
query.columns->children.emplace_back(std::make_shared<ASTIdentifier>(columns[i].name));
|
||||
|
||||
std::stringstream ss;
|
||||
IAST::FormatSettings settings(ss, true);
|
||||
settings.always_quote_identifiers = true;
|
||||
settings.identifier_quoting_style = quoting;
|
||||
query.IAST::format(settings);
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string getQuestionMarks(size_t n)
|
||||
{
|
||||
std::string result = "(";
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
if (i > 0)
|
||||
result += ",";
|
||||
result += "?";
|
||||
}
|
||||
return result + ")";
|
||||
}
|
||||
|
||||
Poco::Dynamic::Var getVarFromField(const Field & field, const ValueType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ValueType::vtUInt8:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt16:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt32:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt64:
|
||||
return Poco::Dynamic::Var(field.get<UInt64>()).convert<UInt64>();
|
||||
case ValueType::vtInt8:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt16:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt32:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt64:
|
||||
return Poco::Dynamic::Var(field.get<Int64>()).convert<Int64>();
|
||||
case ValueType::vtFloat32:
|
||||
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
|
||||
case ValueType::vtFloat64:
|
||||
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
|
||||
case ValueType::vtString:
|
||||
return Poco::Dynamic::Var(field.get<String>()).convert<String>();
|
||||
case ValueType::vtDate:
|
||||
return Poco::Dynamic::Var(LocalDate(DayNum(field.get<UInt64>())).toString()).convert<String>();
|
||||
case ValueType::vtDateTime:
|
||||
return Poco::Dynamic::Var(std::to_string(LocalDateTime(time_t(field.get<UInt64>())))).convert<String>();
|
||||
case ValueType::vtUUID:
|
||||
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
ODBCBlockOutputStream::ODBCBlockOutputStream(Poco::Data::Session && session_,
|
||||
const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_,
|
||||
const Block & sample_block_,
|
||||
IdentifierQuotingStyle quoting_)
|
||||
: session(session_)
|
||||
, db_name(remote_database_name_)
|
||||
, table_name(remote_table_name_)
|
||||
, sample_block(sample_block_)
|
||||
, quoting(quoting_)
|
||||
, log(&Logger::get("ODBCBlockOutputStream"))
|
||||
{
|
||||
description.init(sample_block);
|
||||
}
|
||||
|
||||
Block ODBCBlockOutputStream::getHeader() const
|
||||
{
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
void ODBCBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
ColumnsWithTypeAndName columns;
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
columns.push_back({block.getColumns()[i], sample_block.getDataTypes()[i], sample_block.getNames()[i]});
|
||||
|
||||
std::vector<Poco::Dynamic::Var> row_to_insert(block.columns());
|
||||
Poco::Data::Statement statement(session << getInsertQuery(db_name, table_name, columns, quoting) + getQuestionMarks(block.columns()));
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
statement.addBind(Poco::Data::Keywords::use(row_to_insert[i]));
|
||||
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx)
|
||||
{
|
||||
Field val;
|
||||
columns[col_idx].column->get(i, val);
|
||||
if (val.isNull())
|
||||
row_to_insert[col_idx] = Poco::Dynamic::Var();
|
||||
else
|
||||
row_to_insert[col_idx] = getVarFromField(val, description.types[col_idx].first);
|
||||
}
|
||||
statement.execute();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
31
programs/odbc-bridge/ODBCBlockOutputStream.h
Normal file
31
programs/odbc-bridge/ODBCBlockOutputStream.h
Normal file
@ -0,0 +1,31 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Poco/Data/Session.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Parsers/IdentifierQuotingStyle.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ODBCBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
ODBCBlockOutputStream(Poco::Data::Session && session_, const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_, const Block & sample_block_, IdentifierQuotingStyle quoting);
|
||||
|
||||
Block getHeader() const override;
|
||||
void write(const Block & block) override;
|
||||
|
||||
private:
|
||||
Poco::Data::Session session;
|
||||
std::string db_name;
|
||||
std::string table_name;
|
||||
Block sample_block;
|
||||
IdentifierQuotingStyle quoting;
|
||||
|
||||
ExternalResultDescription description;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -12,6 +12,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
std::string getIdentifierQuote(SQLHDBC hdbc)
|
||||
{
|
||||
std::string identifier;
|
||||
@ -36,6 +41,19 @@ std::string getIdentifierQuote(SQLHDBC hdbc)
|
||||
return identifier;
|
||||
}
|
||||
|
||||
IdentifierQuotingStyle getQuotingStyle(SQLHDBC hdbc)
|
||||
{
|
||||
auto identifier_quote = getIdentifierQuote(hdbc);
|
||||
if (identifier_quote.length() == 0)
|
||||
return IdentifierQuotingStyle::None;
|
||||
else if (identifier_quote[0] == '`')
|
||||
return IdentifierQuotingStyle::Backticks;
|
||||
else if (identifier_quote[0] == '"')
|
||||
return IdentifierQuotingStyle::DoubleQuotes;
|
||||
else
|
||||
throw Exception("Can not map quote identifier '" + identifier_quote + "' to IdentifierQuotingStyle value", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -8,11 +8,15 @@
|
||||
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
|
||||
#include <Parsers/IdentifierQuotingStyle.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string getIdentifierQuote(SQLHDBC hdbc);
|
||||
|
||||
IdentifierQuotingStyle getQuotingStyle(SQLHDBC hdbc);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -112,53 +112,21 @@ namespace
|
||||
BlockInputStreamPtr reader;
|
||||
bool initialized = false;
|
||||
};
|
||||
|
||||
class StorageURLBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
const String & format,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const CompressionMethod compression_method)
|
||||
: sample_block(sample_block_)
|
||||
{
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
|
||||
compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
void write(const Block & block) override
|
||||
{
|
||||
writer->write(block);
|
||||
}
|
||||
|
||||
void writePrefix() override
|
||||
{
|
||||
writer->writePrefix();
|
||||
}
|
||||
|
||||
void writeSuffix() override
|
||||
{
|
||||
writer->writeSuffix();
|
||||
writer->flush();
|
||||
write_buf->finalize();
|
||||
}
|
||||
|
||||
private:
|
||||
Block sample_block;
|
||||
std::unique_ptr<WriteBuffer> write_buf;
|
||||
BlockOutputStreamPtr writer;
|
||||
};
|
||||
}
|
||||
|
||||
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
const String & format,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const CompressionMethod compression_method)
|
||||
: sample_block(sample_block_)
|
||||
{
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
|
||||
compression_method, 3);
|
||||
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
|
||||
}
|
||||
|
||||
std::string IStorageURLBase::getReadMethod() const
|
||||
{
|
||||
|
@ -3,6 +3,9 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -39,10 +42,9 @@ protected:
|
||||
Poco::URI uri;
|
||||
const Context & context_global;
|
||||
String compression_method;
|
||||
|
||||
private:
|
||||
String format_name;
|
||||
|
||||
private:
|
||||
virtual std::string getReadMethod() const;
|
||||
|
||||
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
|
||||
@ -62,6 +64,43 @@ private:
|
||||
virtual Block getHeaderBlock(const Names & column_names) const = 0;
|
||||
};
|
||||
|
||||
class StorageURLBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
StorageURLBlockOutputStream(const Poco::URI & uri,
|
||||
const String & format,
|
||||
const Block & sample_block_,
|
||||
const Context & context,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
const CompressionMethod compression_method);
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
void write(const Block & block) override
|
||||
{
|
||||
writer->write(block);
|
||||
}
|
||||
|
||||
void writePrefix() override
|
||||
{
|
||||
writer->writePrefix();
|
||||
}
|
||||
|
||||
void writeSuffix() override
|
||||
{
|
||||
writer->writeSuffix();
|
||||
writer->flush();
|
||||
write_buf->finalize();
|
||||
}
|
||||
|
||||
private:
|
||||
Block sample_block;
|
||||
std::unique_ptr<WriteBuffer> write_buf;
|
||||
BlockOutputStreamPtr writer;
|
||||
};
|
||||
|
||||
class StorageURL final : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
|
||||
{
|
||||
|
@ -3,17 +3,15 @@
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Storages/transformQueryForExternalDatabase.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <ext/range.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
|
||||
#include <Processors/Pipe.h>
|
||||
|
||||
@ -97,6 +95,30 @@ Pipes StorageXDBC::read(const Names & column_names,
|
||||
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const Context & context)
|
||||
{
|
||||
bridge_helper->startBridgeSync();
|
||||
|
||||
NamesAndTypesList cols;
|
||||
Poco::URI request_uri = uri;
|
||||
request_uri.setPath("/write");
|
||||
for (const String & name : getSampleBlock().getNames())
|
||||
{
|
||||
auto column_data = getColumns().getPhysical(name);
|
||||
cols.emplace_back(column_data.name, column_data.type);
|
||||
}
|
||||
auto url_params = bridge_helper->getURLParams(cols.toString(), 65536);
|
||||
for (const auto & [param, value] : url_params)
|
||||
request_uri.addQueryParameter(param, value);
|
||||
request_uri.addQueryParameter("db_name", remote_database_name);
|
||||
request_uri.addQueryParameter("table_name", remote_table_name);
|
||||
request_uri.addQueryParameter("format_name", format_name);
|
||||
|
||||
return std::make_shared<StorageURLBlockOutputStream>(
|
||||
request_uri, format_name, getSampleBlock(), context,
|
||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||
chooseCompressionMethod(uri.toString(), compression_method));
|
||||
}
|
||||
|
||||
Block StorageXDBC::getHeaderBlock(const Names & column_names) const
|
||||
{
|
||||
|
@ -29,6 +29,8 @@ public:
|
||||
const ColumnsDescription & columns_,
|
||||
const Context & context_, BridgeHelperPtr bridge_helper_);
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
|
||||
|
||||
private:
|
||||
|
||||
BridgeHelperPtr bridge_helper;
|
||||
|
@ -111,6 +111,21 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, column_x Nulla
|
||||
|
||||
conn.close()
|
||||
|
||||
def test_mysql_insert(started_cluster):
|
||||
mysql_setup = node1.odbc_drivers["MySQL"]
|
||||
table_name = 'test_insert'
|
||||
conn = get_mysql_conn()
|
||||
create_mysql_table(conn, table_name)
|
||||
odbc_args = "'DSN={}', '{}', '{}'".format(mysql_setup["DSN"], mysql_setup["Database"], table_name)
|
||||
|
||||
node1.query("create table mysql_insert (id Int64, name String, age UInt8, money Float, column_x Nullable(Int16)) Engine=ODBC({})".format(odbc_args))
|
||||
node1.query("insert into mysql_insert values (1, 'test', 11, 111, 1111), (2, 'odbc', 22, 222, NULL)")
|
||||
assert node1.query("select * from mysql_insert") == "1\ttest\t11\t111\t1111\n2\todbc\t22\t222\t\\N\n"
|
||||
|
||||
node1.query("insert into table function odbc({}) values (3, 'insert', 33, 333, 3333)".format(odbc_args))
|
||||
node1.query("insert into table function odbc({}) (id, name, age, money) select id*4, upper(name), age*4, money*4 from odbc({}) where id=1".format(odbc_args, odbc_args))
|
||||
assert node1.query("select * from mysql_insert where id in (3, 4)") == "3\tinsert\t33\t333\t3333\n4\tTEST\t44\t444\t\\N\n"
|
||||
|
||||
|
||||
def test_sqlite_simple_select_function_works(started_cluster):
|
||||
sqlite_setup = node1.odbc_drivers["SQLite3"]
|
||||
@ -170,7 +185,11 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
|
||||
|
||||
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "3\n"
|
||||
|
||||
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t3 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
|
||||
# Allow insert
|
||||
node1.exec_in_container(["bash", "-c", "chmod a+rw /tmp"], privileged=True, user='root')
|
||||
node1.exec_in_container(["bash", "-c", "chmod a+rw {}".format(sqlite_db)], privileged=True, user='root')
|
||||
|
||||
node1.query("insert into table function odbc('DSN={};', '', 't3') values (200, 2, 7)".format(node1.odbc_drivers["SQLite3"]["DSN"]))
|
||||
|
||||
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))") == "7\n" # new value
|
||||
|
||||
@ -200,6 +219,17 @@ def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow(started_cluster):
|
||||
|
||||
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))") == "xxx\n"
|
||||
|
||||
def test_postgres_insert(started_cluster):
|
||||
conn = get_postgres_conn()
|
||||
conn.cursor().execute("truncate table clickhouse.test_table")
|
||||
node1.query("create table pg_insert (column1 UInt8, column2 String) engine=ODBC('DSN=postgresql_odbc;', 'clickhouse', 'test_table')")
|
||||
node1.query("insert into pg_insert values (1, 'hello'), (2, 'world')")
|
||||
assert node1.query("select * from pg_insert") == '1\thello\n2\tworld\n'
|
||||
node1.query("insert into table function odbc('DSN=postgresql_odbc;', 'clickhouse', 'test_table') format CSV 3,test")
|
||||
node1.query("insert into table function odbc('DSN=postgresql_odbc;', 'clickhouse', 'test_table') select number, 's' || toString(number) from numbers (4, 7)")
|
||||
assert node1.query("select sum(column1), count(column1) from pg_insert") == "55\t10\n"
|
||||
assert node1.query("select sum(n), count(n) from (select (*,).1 as n from (select * from odbc('DSN=postgresql_odbc;', 'clickhouse', 'test_table')))") == "55\t10\n"
|
||||
|
||||
def test_bridge_dies_with_parent(started_cluster):
|
||||
node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user