Support write for XDBCStorage

This commit is contained in:
bobrovskij artemij 2020-04-28 03:56:44 +03:00
parent 33dde39936
commit 1bcd077fbf
10 changed files with 324 additions and 97 deletions

View File

@ -5,6 +5,7 @@ set(CLICKHOUSE_ODBC_BRIDGE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/IdentifierQuoteHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MainHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBlockInputStream.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBlockOutputStream.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBridge.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PingHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/validateODBCConnectionString.cpp

View File

@ -30,8 +30,10 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco:
#else
return nullptr;
#endif
else if (uri.getPath() == "/write")
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
else
return new ODBCHandler(pool_map, keep_alive_timeout, context);
return new ODBCHandler(pool_map, keep_alive_timeout, context, "read");
}
return nullptr;
}

View File

@ -5,17 +5,20 @@
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include "ODBCBlockInputStream.h"
#include "ODBCBlockOutputStream.h"
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <common/logger_useful.h>
#include <mutex>
#include <Poco/ThreadPool.h>
#include <IO/ReadBufferFromIStream.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
namespace DB
{
@ -63,49 +66,33 @@ ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
return pool_map->at(connection_str);
}
void ODBCHandler::processError(Poco::Net::HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
Poco::Net::HTMLForm params(request);
if (mode == "read")
params.read(request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};
if (!params.has("query"))
{
process_error("No 'query' in request body");
return;
}
if (!params.has("columns"))
{
process_error("No 'columns' in request URL");
processError(response, "No 'columns' in request URL");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
processError(response, "No 'connection_string' in request URL");
return;
}
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
std::string max_block_size_str = params.get("max_block_size", "");
if (max_block_size_str.empty())
{
process_error("Empty max_block_size specified");
return;
}
max_block_size = parse<size_t>(max_block_size_str);
}
std::string columns = params.get("columns");
std::unique_ptr<Block> sample_block;
try
@ -114,33 +101,91 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
}
catch (const Exception & ex)
{
process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'");
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
std::string format = params.get("format", "RowBinary");
std::string query = params.get("query");
LOG_TRACE(log, "Query: " << query);
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '" << connection_string << "'");
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
std::string max_block_size_str = params.get("max_block_size", "");
if (max_block_size_str.empty())
{
processError(response, "Empty max_block_size specified");
return;
}
max_block_size = parse<size_t>(max_block_size_str);
}
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
try
if (mode == "write")
{
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);
try
{
if (!params.has("db_name"))
{
processError(response, "No 'db_name' in request URL");
return;
}
if (!params.has("table_name"))
{
processError(response, "No 'table_name' in request URL");
return;
}
std::string db_name = params.get("db_name");
std::string table_name = params.get("table_name");
LOG_TRACE(log, "DB name: '" << db_name << "', table name: '" << table_name << "'");
auto pool = getPool(connection_string);
ReadBufferFromIStream read_buf(request.stream());
BlockInputStreamPtr input_stream = FormatFactory::instance().getInput(format, read_buf, *sample_block, *context, max_block_size);
ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block);
copyData(*input_stream, output_stream);
writeStringBinary("Ok.", out);
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
writeStringBinary(message, out);
tryLogCurrentException(log);
}
}
catch (...)
else
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
writeStringBinary(message, out);
tryLogCurrentException(log);
if (!params.has("query"))
{
processError(response, "No 'query' in request body");
return;
}
std::string query = params.get("query");
LOG_TRACE(log, "Query: " << query);
try
{
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
writeStringBinary(message, out);
tryLogCurrentException(log);
}
}
}
}

View File

@ -24,11 +24,13 @@ public:
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
size_t keep_alive_timeout_,
std::shared_ptr<Context> context_)
std::shared_ptr<Context> context_,
const String & mode_)
: log(&Poco::Logger::get("ODBCHandler"))
, pool_map(pool_map_)
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
, mode(mode_)
{
}
@ -40,10 +42,12 @@ private:
std::shared_ptr<PoolMap> pool_map;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
String mode;
static inline std::mutex mutex;
PoolPtr getPool(const std::string & connection_str);
void processError(Poco::Net::HTTPServerResponse & response, const std::string & message);
};
}

View File

@ -0,0 +1,114 @@
#include "ODBCBlockOutputStream.h"
#include <common/logger_useful.h>
#include <Core/Field.h>
#include <common/LocalDate.h>
namespace DB
{
namespace {
using ValueType = ExternalResultDescription::ValueType;
std::string commaSeparateColumnNames(const ColumnsWithTypeAndName & columns)
{
std::string result = "(";
for (size_t i = 0; i < columns.size(); ++i) {
if (i > 0)
result += ",";
result += columns[i].name;
}
return result + ")";
}
std::string getQuestionMarks(size_t n)
{
std::string result = "(";
for (size_t i = 0; i < n; ++i) {
if (i > 0)
result += ",";
result += "?";
}
return result + ")";
}
Poco::Dynamic::Var getVarFromField(const Field & field, const ValueType type)
{
switch (type) {
case ValueType::vtUInt8:
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt8>())).convert<UInt64>();
case ValueType::vtUInt16:
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt16>())).convert<UInt64>();
case ValueType::vtUInt32:
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt32>())).convert<UInt64>();
case ValueType::vtUInt64:
return Poco::Dynamic::Var(field.get<UInt64>()).convert<UInt64>();
case ValueType::vtInt8:
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int8>())).convert<Int64>();
case ValueType::vtInt16:
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int16>())).convert<Int64>();
case ValueType::vtInt32:
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int32>())).convert<Int64>();
case ValueType::vtInt64:
return Poco::Dynamic::Var(field.get<Int64>()).convert<Int64>();
case ValueType::vtFloat32:
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
case ValueType::vtFloat64:
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
case ValueType::vtString:
return Poco::Dynamic::Var(field.get<String>()).convert<String>();
case ValueType::vtDate:
return Poco::Dynamic::Var(LocalDate(DayNum(field.get<UInt16>())).toString()).convert<String>();
case ValueType::vtDateTime:
return Poco::Dynamic::Var(LocalDate(time_t(field.get<UInt32>())).toString()).convert<String>();
case ValueType::vtUUID:
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
}
return Poco::Dynamic::Var(); // Throw smth here?
}
}
ODBCBlockOutputStream::ODBCBlockOutputStream(Poco::Data::Session && session_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const Block & sample_block_)
: session(session_)
, db_name(remote_database_name_)
, table_name(remote_table_name_)
, sample_block(sample_block_)
, log(&Logger::get("ODBCBlockOutputStream"))
{
description.init(sample_block);
}
Block ODBCBlockOutputStream::getHeader() const
{
return sample_block;
}
void ODBCBlockOutputStream::write(const Block & block)
{
ColumnsWithTypeAndName columns;
for (size_t i = 0; i < block.columns(); ++i)
columns.push_back({block.getColumns()[i], sample_block.getDataTypes()[i], sample_block.getNames()[i]});
std::vector<Poco::Dynamic::Var> row_to_insert(block.columns());
Poco::Data::Statement statement(session << "INSERT INTO " + db_name + "." + table_name + " " +
commaSeparateColumnNames(columns) +
" VALUES " + getQuestionMarks(block.columns()));
for (size_t i = 0; i < block.columns(); ++i)
statement, Poco::Data::Keywords::use(row_to_insert[i]);
for (size_t i = 0; i < block.rows(); ++i)
{
for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx)
{
Field val;
columns[col_idx].column->get(i, val);
row_to_insert[col_idx] = getVarFromField(val, description.types[col_idx].first);
}
statement.execute();
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Data/Session.h>
#include <Core/ExternalResultDescription.h>
namespace DB{
class ODBCBlockOutputStream : public IBlockOutputStream {
public:
ODBCBlockOutputStream(Poco::Data::Session && session_, const std::string & remote_database_name_,
const std::string & remote_table_name_, const Block & sample_block_);
Block getHeader() const override;
void write(const Block & block) override;
private:
Poco::Data::Session session;
std::string db_name;
std::string table_name;
Block sample_block;
ExternalResultDescription description;
Poco::Logger * log;
};
}

View File

@ -112,53 +112,21 @@ namespace
BlockInputStreamPtr reader;
bool initialized = false;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
{
public:
StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
Block getHeader() const override
{
return sample_block;
}
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
{
writer->writePrefix();
}
void writeSuffix() override
{
writer->writeSuffix();
writer->flush();
write_buf->finalize();
}
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
}
StorageURLBlockOutputStream::StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts),
compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
std::string IStorageURLBase::getReadMethod() const
{

View File

@ -3,6 +3,9 @@
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <ext/shared_ptr_helper.h>
#include <DataStreams/IBlockOutputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/CompressionMethod.h>
namespace DB
@ -39,10 +42,9 @@ protected:
Poco::URI uri;
const Context & context_global;
String compression_method;
private:
String format_name;
private:
virtual std::string getReadMethod() const;
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
@ -62,6 +64,43 @@ private:
virtual Block getHeaderBlock(const Names & column_names) const = 0;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
{
public:
StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method);
Block getHeader() const override
{
return sample_block;
}
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
{
writer->writePrefix();
}
void writeSuffix() override
{
writer->writeSuffix();
writer->flush();
write_buf->finalize();
}
private:
Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
class StorageURL final : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
{

View File

@ -3,17 +3,15 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageURL.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Common/ShellCommand.h>
#include <ext/range.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Pipe.h>
@ -97,6 +95,33 @@ Pipes StorageXDBC::read(const Names & column_names,
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const Context & context) {
bridge_helper->startBridgeSync();
LOG_INFO(log, "TAAAK");
LOG_INFO(log, remote_database_name);
LOG_INFO(log, remote_table_name);
// LOG_INFO(log, bridge_helper->getConnectionString());
// some copypaste
NamesAndTypesList cols;
Poco::URI request_uri = uri;
request_uri.setPath("/write");
for (const String & name : getSampleBlock().getNames())
{
auto column_data = getColumn(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto url_params = bridge_helper->getURLParams(cols.toString(), 65536);
for (const auto & [param, value] : url_params)
request_uri.addQueryParameter(param, value);
request_uri.addQueryParameter("db_name", remote_database_name);
request_uri.addQueryParameter("table_name", remote_table_name);
return std::make_shared<StorageURLBlockOutputStream>(
request_uri, format_name, getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));
}
Block StorageXDBC::getHeaderBlock(const Names & column_names) const
{

View File

@ -29,6 +29,8 @@ public:
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
private:
BridgeHelperPtr bridge_helper;