mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
commit
1945d01719
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -221,6 +221,9 @@
|
||||
[submodule "contrib/NuRaft"]
|
||||
path = contrib/NuRaft
|
||||
url = https://github.com/ClickHouse-Extras/NuRaft.git
|
||||
[submodule "contrib/nanodbc"]
|
||||
path = contrib/nanodbc
|
||||
url = https://github.com/ClickHouse-Extras/nanodbc.git
|
||||
[submodule "contrib/datasketches-cpp"]
|
||||
path = contrib/datasketches-cpp
|
||||
url = https://github.com/ClickHouse-Extras/datasketches-cpp.git
|
||||
|
@ -512,6 +512,7 @@ include (cmake/find/fastops.cmake)
|
||||
include (cmake/find/odbc.cmake)
|
||||
include (cmake/find/rocksdb.cmake)
|
||||
include (cmake/find/libpqxx.cmake)
|
||||
include (cmake/find/nanodbc.cmake)
|
||||
include (cmake/find/nuraft.cmake)
|
||||
|
||||
|
||||
|
35
cmake/find/nanodbc.cmake
Normal file
35
cmake/find/nanodbc.cmake
Normal file
@ -0,0 +1,35 @@
|
||||
option(ENABLE_NANODBC "Enalbe nanodbc" ${ENABLE_LIBRARIES})
|
||||
|
||||
if (NOT ENABLE_NANODBC)
|
||||
set (USE_ODBC 0)
|
||||
return()
|
||||
endif()
|
||||
|
||||
if (NOT ENABLE_ODBC)
|
||||
set (USE_NANODBC 0)
|
||||
message (STATUS "Using nanodbc=${USE_NANODBC}")
|
||||
return()
|
||||
endif()
|
||||
|
||||
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/nanodbc/CMakeLists.txt")
|
||||
message (WARNING "submodule contrib/nanodbc is missing. to fix try run: \n git submodule update --init --recursive")
|
||||
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal nanodbc library")
|
||||
set (USE_NANODBC 0)
|
||||
return()
|
||||
endif()
|
||||
|
||||
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/unixodbc/include")
|
||||
message (ERROR "submodule contrib/unixodbc is missing. to fix try run: \n git submodule update --init --recursive")
|
||||
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal unixodbc needed for nanodbc")
|
||||
set (USE_NANODBC 0)
|
||||
return()
|
||||
endif()
|
||||
|
||||
set (USE_NANODBC 1)
|
||||
|
||||
set (NANODBC_LIBRARY nanodbc)
|
||||
|
||||
set (NANODBC_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/nanodbc/nanodbce")
|
||||
|
||||
message (STATUS "Using nanodbc=${USE_NANODBC}: ${NANODBC_INCLUDE_DIR} : ${NANODBC_LIBRARY}")
|
||||
message (STATUS "Using unixodbc")
|
4
contrib/CMakeLists.txt
vendored
4
contrib/CMakeLists.txt
vendored
@ -326,6 +326,10 @@ if (USE_LIBPQXX)
|
||||
add_subdirectory (libpqxx-cmake)
|
||||
endif()
|
||||
|
||||
if (USE_NANODBC)
|
||||
add_subdirectory (nanodbc-cmake)
|
||||
endif()
|
||||
|
||||
if (USE_NURAFT)
|
||||
add_subdirectory(nuraft-cmake)
|
||||
endif()
|
||||
|
1
contrib/nanodbc
vendored
Submodule
1
contrib/nanodbc
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 9fc459675515d491401727ec67fca38db721f28c
|
14
contrib/nanodbc-cmake/CMakeLists.txt
Normal file
14
contrib/nanodbc-cmake/CMakeLists.txt
Normal file
@ -0,0 +1,14 @@
|
||||
set (LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/nanodbc)
|
||||
|
||||
if (NOT TARGET unixodbc)
|
||||
message(FATAL_ERROR "Configuration error: unixodbc is not a target")
|
||||
endif()
|
||||
|
||||
set (SRCS
|
||||
${LIBRARY_DIR}/nanodbc/nanodbc.cpp
|
||||
)
|
||||
|
||||
add_library(nanodbc ${SRCS})
|
||||
|
||||
target_link_libraries (nanodbc PUBLIC unixodbc)
|
||||
target_include_directories (nanodbc SYSTEM PUBLIC ${LIBRARY_DIR}/)
|
@ -17,7 +17,6 @@ add_executable(clickhouse-library-bridge ${CLICKHOUSE_LIBRARY_BRIDGE_SOURCES})
|
||||
target_link_libraries(clickhouse-library-bridge PRIVATE
|
||||
daemon
|
||||
dbms
|
||||
clickhouse_parsers
|
||||
bridge
|
||||
)
|
||||
|
||||
|
@ -26,11 +26,12 @@ target_link_libraries(clickhouse-odbc-bridge PRIVATE
|
||||
dbms
|
||||
bridge
|
||||
clickhouse_parsers
|
||||
Poco::Data
|
||||
Poco::Data::ODBC
|
||||
nanodbc
|
||||
unixodbc
|
||||
)
|
||||
|
||||
set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
|
||||
target_compile_options (clickhouse-odbc-bridge PRIVATE -Wno-reserved-id-macro -Wno-keyword-macro)
|
||||
|
||||
if (USE_GDB_ADD_INDEX)
|
||||
add_custom_command(TARGET clickhouse-odbc-bridge POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} ../clickhouse-odbc-bridge COMMENT "Adding .gdb-index to clickhouse-odbc-bridge" VERBATIM)
|
||||
|
@ -2,29 +2,36 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <DataTypes/DataTypeFactory.h>
|
||||
# include <DataTypes/DataTypeNullable.h>
|
||||
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <Parsers/ParserQueryWithOutput.h>
|
||||
# include <Parsers/parseQuery.h>
|
||||
# include <Poco/Data/ODBC/ODBCException.h>
|
||||
# include <Poco/Data/ODBC/SessionImpl.h>
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
# include <Server/HTTP/HTMLForm.h>
|
||||
# include <Poco/Net/HTTPServerRequest.h>
|
||||
# include <Poco/Net/HTTPServerResponse.h>
|
||||
# include <Poco/NumberParser.h>
|
||||
# include <common/logger_useful.h>
|
||||
# include <Common/quoteString.h>
|
||||
# include <ext/scope_guard.h>
|
||||
# include "getIdentifierQuote.h"
|
||||
# include "validateODBCConnectionString.h"
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/ParserQueryWithOutput.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <Poco/NumberParser.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <ext/scope_guard.h>
|
||||
#include "getIdentifierQuote.h"
|
||||
#include "validateODBCConnectionString.h"
|
||||
#include "ODBCConnectionFactory.h"
|
||||
|
||||
#include <sql.h>
|
||||
#include <sqlext.h>
|
||||
|
||||
# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
DataTypePtr getDataType(SQLSMALLINT type)
|
||||
@ -59,6 +66,7 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
HTMLForm params(request, request.getStream());
|
||||
@ -77,87 +85,78 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
|
||||
process_error("No 'table' param in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!params.has("connection_string"))
|
||||
{
|
||||
process_error("No 'connection_string' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string schema_name;
|
||||
std::string table_name = params.get("table");
|
||||
std::string connection_string = params.get("connection_string");
|
||||
|
||||
if (params.has("schema"))
|
||||
{
|
||||
schema_name = params.get("schema");
|
||||
LOG_TRACE(log, "Will fetch info for table '{}'", schema_name + "." + table_name);
|
||||
}
|
||||
else
|
||||
LOG_TRACE(log, "Will fetch info for table '{}'", table_name);
|
||||
|
||||
LOG_TRACE(log, "Got connection str '{}'", connection_string);
|
||||
|
||||
try
|
||||
{
|
||||
const bool external_table_functions_use_nulls = Poco::NumberParser::parseBool(params.get("external_table_functions_use_nulls", "false"));
|
||||
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
SQLHDBC hdbc = session.dbc().handle();
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
SQLHSTMT hstmt = nullptr;
|
||||
nanodbc::catalog catalog(*connection);
|
||||
std::string catalog_name;
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(SQLAllocStmt(hdbc, &hstmt)))
|
||||
throw POCO_SQL_ODBC_CLASS::ODBCException("Could not allocate connection handle.");
|
||||
/// In XDBC tables it is allowed to pass either database_name or schema_name in table definion, but not both of them.
|
||||
/// They both are passed as 'schema' parameter in request URL, so it is not clear whether it is database_name or schema_name passed.
|
||||
/// If it is schema_name then we know that database is added in odbc.ini. But if we have database_name as 'schema',
|
||||
/// it is not guaranteed. For nanodbc database_name must be either in odbc.ini or passed as catalog_name.
|
||||
auto get_columns = [&]()
|
||||
{
|
||||
nanodbc::catalog::tables tables = catalog.find_tables(table_name, /* type = */ "", /* schema = */ "", /* catalog = */ schema_name);
|
||||
if (tables.next())
|
||||
{
|
||||
catalog_name = tables.table_catalog();
|
||||
LOG_TRACE(log, "Will fetch info for table '{}.{}'", catalog_name, table_name);
|
||||
return catalog.find_columns(/* column = */ "", table_name, /* schema = */ "", catalog_name);
|
||||
}
|
||||
|
||||
SCOPE_EXIT(SQLFreeStmt(hstmt, SQL_DROP));
|
||||
tables = catalog.find_tables(table_name, /* type = */ "", /* schema = */ schema_name);
|
||||
if (tables.next())
|
||||
{
|
||||
catalog_name = tables.table_catalog();
|
||||
LOG_TRACE(log, "Will fetch info for table '{}.{}.{}'", catalog_name, schema_name, table_name);
|
||||
return catalog.find_columns(/* column = */ "", table_name, schema_name, catalog_name);
|
||||
}
|
||||
|
||||
const auto & context_settings = getContext()->getSettingsRef();
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table {} not found", schema_name.empty() ? table_name : schema_name + '.' + table_name);
|
||||
};
|
||||
|
||||
/// TODO Why not do SQLColumns instead?
|
||||
std::string name = schema_name.empty() ? backQuoteIfNeed(table_name) : backQuoteIfNeed(schema_name) + "." + backQuoteIfNeed(table_name);
|
||||
WriteBufferFromOwnString buf;
|
||||
std::string input = "SELECT * FROM " + name + " WHERE 1 = 0";
|
||||
ParserQueryWithOutput parser(input.data() + input.size());
|
||||
ASTPtr select = parseQuery(parser, input.data(), input.data() + input.size(), "", context_settings.max_query_size, context_settings.max_parser_depth);
|
||||
|
||||
IAST::FormatSettings settings(buf, true);
|
||||
settings.always_quote_identifiers = true;
|
||||
settings.identifier_quoting_style = getQuotingStyle(hdbc);
|
||||
select->format(settings);
|
||||
std::string query = buf.str();
|
||||
|
||||
LOG_TRACE(log, "Inferring structure with query '{}'", query);
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(query.data()), query.size())))
|
||||
throw POCO_SQL_ODBC_CLASS::DescriptorException(session.dbc());
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(SQLExecute(hstmt)))
|
||||
throw POCO_SQL_ODBC_CLASS::StatementException(hstmt);
|
||||
|
||||
SQLSMALLINT cols = 0;
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(SQLNumResultCols(hstmt, &cols)))
|
||||
throw POCO_SQL_ODBC_CLASS::StatementException(hstmt);
|
||||
|
||||
/// TODO cols not checked
|
||||
nanodbc::catalog::columns columns_definition = get_columns();
|
||||
|
||||
NamesAndTypesList columns;
|
||||
for (SQLSMALLINT ncol = 1; ncol <= cols; ++ncol)
|
||||
while (columns_definition.next())
|
||||
{
|
||||
SQLSMALLINT type = 0;
|
||||
/// TODO Why 301?
|
||||
SQLCHAR column_name[301];
|
||||
SQLSMALLINT type = columns_definition.sql_data_type();
|
||||
std::string column_name = columns_definition.column_name();
|
||||
|
||||
SQLSMALLINT is_nullable;
|
||||
const auto result = POCO_SQL_ODBC_CLASS::SQLDescribeCol(hstmt, ncol, column_name, sizeof(column_name), nullptr, &type, nullptr, nullptr, &is_nullable);
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(result))
|
||||
throw POCO_SQL_ODBC_CLASS::StatementException(hstmt);
|
||||
bool is_nullable = columns_definition.nullable() == SQL_NULLABLE;
|
||||
|
||||
auto column_type = getDataType(type);
|
||||
|
||||
if (external_table_functions_use_nulls && is_nullable == SQL_NULLABLE)
|
||||
{
|
||||
column_type = std::make_shared<DataTypeNullable>(column_type);
|
||||
|
||||
columns.emplace_back(column_name, std::move(column_type));
|
||||
}
|
||||
|
||||
columns.emplace_back(reinterpret_cast<char *>(column_name), std::move(column_type));
|
||||
}
|
||||
if (columns.empty())
|
||||
throw Exception("Columns definition was not returned", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
|
||||
try
|
||||
|
@ -2,16 +2,13 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <Interpreters/Context_fwd.h>
|
||||
# include <Server/HTTP/HTTPRequestHandler.h>
|
||||
# include <Common/config.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
#include <Common/config.h>
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
# include <Poco/Logger.h>
|
||||
|
||||
/** The structure of the table is taken from the query "SELECT * FROM table WHERE 1=0".
|
||||
* TODO: It would be much better to utilize ODBC methods dedicated for columns description.
|
||||
* If there is no such table, an exception is thrown.
|
||||
*/
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -19,7 +16,9 @@ class ODBCColumnsInfoHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_), log(&Poco::Logger::get("ODBCColumnsInfoHandler")), keep_alive_timeout(keep_alive_timeout_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("ODBCColumnsInfoHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -38,9 +38,9 @@ std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandl
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/write")
|
||||
return std::make_unique<ODBCHandler>(pool_map, keep_alive_timeout, getContext(), "write");
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, getContext(), "write");
|
||||
else
|
||||
return std::make_unique<ODBCHandler>(pool_map, keep_alive_timeout, getContext(), "read");
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, getContext(), "read");
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -6,14 +6,8 @@
|
||||
#include "IdentifierQuoteHandler.h"
|
||||
#include "MainHandler.h"
|
||||
#include "SchemaAllowedHandler.h"
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
#include <Poco/Data/SessionPool.h>
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -24,9 +18,11 @@ class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_), log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
pool_map = std::make_shared<ODBCHandler::PoolMap>();
|
||||
}
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
|
||||
@ -35,7 +31,6 @@ private:
|
||||
Poco::Logger * log;
|
||||
std::string name;
|
||||
size_t keep_alive_timeout;
|
||||
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,23 +2,20 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <DataTypes/DataTypeFactory.h>
|
||||
# include <Server/HTTP/HTMLForm.h>
|
||||
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <Parsers/ParserQueryWithOutput.h>
|
||||
# include <Parsers/parseQuery.h>
|
||||
# include <Poco/Data/ODBC/ODBCException.h>
|
||||
# include <Poco/Data/ODBC/SessionImpl.h>
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
# include <Poco/Net/HTTPServerRequest.h>
|
||||
# include <Poco/Net/HTTPServerResponse.h>
|
||||
# include <common/logger_useful.h>
|
||||
# include <ext/scope_guard.h>
|
||||
# include "getIdentifierQuote.h"
|
||||
# include "validateODBCConnectionString.h"
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/ParserQueryWithOutput.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <ext/scope_guard.h>
|
||||
#include "getIdentifierQuote.h"
|
||||
#include "validateODBCConnectionString.h"
|
||||
#include "ODBCConnectionFactory.h"
|
||||
|
||||
# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -44,10 +41,12 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
|
||||
try
|
||||
{
|
||||
std::string connection_string = params.get("connection_string");
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
SQLHDBC hdbc = session.dbc().handle();
|
||||
|
||||
auto identifier = getIdentifierQuote(hdbc);
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
auto identifier = getIdentifierQuote(*connection);
|
||||
|
||||
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
|
||||
try
|
||||
|
@ -11,11 +11,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IdentifierQuoteHandler : public HTTPRequestHandler
|
||||
class IdentifierQuoteHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
IdentifierQuoteHandler(size_t keep_alive_timeout_, ContextPtr)
|
||||
: log(&Poco::Logger::get("IdentifierQuoteHandler")), keep_alive_timeout(keep_alive_timeout_)
|
||||
IdentifierQuoteHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("IdentifierQuoteHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -18,18 +18,17 @@
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include "ODBCConnectionFactory.h"
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
|
||||
#include <nanodbc/nanodbc.h>
|
||||
|
||||
#if USE_ODBC
|
||||
#include <Poco/Data/ODBC/SessionImpl.h>
|
||||
#define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
std::unique_ptr<Block> parseColumns(std::string && column_string)
|
||||
@ -42,37 +41,6 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>;
|
||||
/** Is used to adjust max size of default Poco thread pool. See issue #750
|
||||
* Acquire the lock, resize pool and construct new Session.
|
||||
*/
|
||||
static std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr)
|
||||
{
|
||||
static std::mutex mutex;
|
||||
|
||||
Poco::ThreadPool & pool = Poco::ThreadPool::defaultPool();
|
||||
|
||||
/// NOTE: The lock don't guarantee that external users of the pool don't change its capacity
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
if (pool.available() == 0)
|
||||
pool.addCapacity(2 * std::max(pool.capacity(), 1));
|
||||
|
||||
return pool_constr();
|
||||
}
|
||||
|
||||
ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!pool_map->count(connection_str))
|
||||
{
|
||||
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str]
|
||||
{
|
||||
return std::make_shared<Poco::Data::SessionPool>("ODBC", validateODBCConnectionString(connection_str));
|
||||
}));
|
||||
}
|
||||
return pool_map->at(connection_str);
|
||||
}
|
||||
|
||||
void ODBCHandler::processError(HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
@ -82,6 +50,7 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
|
||||
LOG_WARNING(log, message);
|
||||
}
|
||||
|
||||
|
||||
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
HTMLForm params(request);
|
||||
@ -141,6 +110,10 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
|
||||
try
|
||||
{
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
if (mode == "write")
|
||||
{
|
||||
if (!params.has("db_name"))
|
||||
@ -159,15 +132,12 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
|
||||
auto quoting_style = IdentifierQuotingStyle::None;
|
||||
#if USE_ODBC
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
quoting_style = getQuotingStyle(session.dbc().handle());
|
||||
quoting_style = getQuotingStyle(*connection);
|
||||
#endif
|
||||
|
||||
auto pool = getPool(connection_string);
|
||||
auto & read_buf = request.getStream();
|
||||
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, getContext(), max_block_size);
|
||||
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
|
||||
ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block, quoting_style);
|
||||
ODBCBlockOutputStream output_stream(*connection, db_name, table_name, *sample_block, getContext(), quoting_style);
|
||||
copyData(*input_stream, output_stream);
|
||||
writeStringBinary("Ok.", out);
|
||||
}
|
||||
@ -176,10 +146,8 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
std::string query = params.get("query");
|
||||
LOG_TRACE(log, "Query: {}", query);
|
||||
|
||||
BlockOutputStreamPtr writer
|
||||
= FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, getContext());
|
||||
auto pool = getPool(connection_string);
|
||||
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, getContext());
|
||||
ODBCBlockInputStream inp(*connection, query, *sample_block, max_block_size);
|
||||
copyData(inp, *writer);
|
||||
}
|
||||
}
|
||||
|
@ -2,13 +2,8 @@
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
#include <Poco/Data/SessionPool.h>
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
@ -24,16 +19,12 @@ namespace DB
|
||||
class ODBCHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
using PoolPtr = std::shared_ptr<Poco::Data::SessionPool>;
|
||||
using PoolMap = std::unordered_map<std::string, PoolPtr>;
|
||||
|
||||
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
|
||||
ODBCHandler(
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_,
|
||||
const String & mode_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("ODBCHandler"))
|
||||
, pool_map(pool_map_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, mode(mode_)
|
||||
{
|
||||
@ -44,13 +35,11 @@ public:
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
|
||||
std::shared_ptr<PoolMap> pool_map;
|
||||
size_t keep_alive_timeout;
|
||||
String mode;
|
||||
|
||||
static inline std::mutex mutex;
|
||||
|
||||
PoolPtr getPool(const std::string & connection_str);
|
||||
void processError(HTTPServerResponse & response, const std::string & message);
|
||||
};
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
#include "ODBCBlockInputStream.h"
|
||||
#include <vector>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
@ -14,137 +16,143 @@ namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
|
||||
extern const int UNKNOWN_TYPE;
|
||||
}
|
||||
|
||||
|
||||
ODBCBlockInputStream::ODBCBlockInputStream(
|
||||
Poco::Data::Session && session_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
|
||||
: session{session_}
|
||||
, statement{(this->session << query_str, Poco::Data::Keywords::now)}
|
||||
, result{statement}
|
||||
, iterator{result.begin()}
|
||||
nanodbc::connection & connection_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
|
||||
: log(&Poco::Logger::get("ODBCBlockInputStream"))
|
||||
, max_block_size{max_block_size_}
|
||||
, log(&Poco::Logger::get("ODBCBlockInputStream"))
|
||||
, connection(connection_)
|
||||
, query(query_str)
|
||||
{
|
||||
if (sample_block.columns() != result.columnCount())
|
||||
throw Exception{"RecordSet contains " + toString(result.columnCount()) + " columns while " + toString(sample_block.columns())
|
||||
+ " expected",
|
||||
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
|
||||
|
||||
description.init(sample_block);
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
||||
void insertValue(IColumn & column, const ValueType type, const Poco::Dynamic::Var & value)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ValueType::vtUInt8:
|
||||
assert_cast<ColumnUInt8 &>(column).insertValue(value.convert<UInt64>());
|
||||
break;
|
||||
case ValueType::vtUInt16:
|
||||
assert_cast<ColumnUInt16 &>(column).insertValue(value.convert<UInt64>());
|
||||
break;
|
||||
case ValueType::vtUInt32:
|
||||
assert_cast<ColumnUInt32 &>(column).insertValue(value.convert<UInt64>());
|
||||
break;
|
||||
case ValueType::vtUInt64:
|
||||
assert_cast<ColumnUInt64 &>(column).insertValue(value.convert<UInt64>());
|
||||
break;
|
||||
case ValueType::vtInt8:
|
||||
assert_cast<ColumnInt8 &>(column).insertValue(value.convert<Int64>());
|
||||
break;
|
||||
case ValueType::vtInt16:
|
||||
assert_cast<ColumnInt16 &>(column).insertValue(value.convert<Int64>());
|
||||
break;
|
||||
case ValueType::vtInt32:
|
||||
assert_cast<ColumnInt32 &>(column).insertValue(value.convert<Int64>());
|
||||
break;
|
||||
case ValueType::vtInt64:
|
||||
assert_cast<ColumnInt64 &>(column).insertValue(value.convert<Int64>());
|
||||
break;
|
||||
case ValueType::vtFloat32:
|
||||
assert_cast<ColumnFloat32 &>(column).insertValue(value.convert<Float64>());
|
||||
break;
|
||||
case ValueType::vtFloat64:
|
||||
assert_cast<ColumnFloat64 &>(column).insertValue(value.convert<Float64>());
|
||||
break;
|
||||
case ValueType::vtString:
|
||||
assert_cast<ColumnString &>(column).insert(value.convert<String>());
|
||||
break;
|
||||
case ValueType::vtDate:
|
||||
{
|
||||
Poco::DateTime date = value.convert<Poco::DateTime>();
|
||||
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate(date.year(), date.month(), date.day()).getDayNum()});
|
||||
break;
|
||||
}
|
||||
case ValueType::vtDateTime:
|
||||
{
|
||||
Poco::DateTime datetime = value.convert<Poco::DateTime>();
|
||||
assert_cast<ColumnUInt32 &>(column).insertValue(DateLUT::instance().makeDateTime(
|
||||
datetime.year(), datetime.month(), datetime.day(), datetime.hour(), datetime.minute(), datetime.second()));
|
||||
break;
|
||||
}
|
||||
case ValueType::vtUUID:
|
||||
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.convert<std::string>()));
|
||||
break;
|
||||
default:
|
||||
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
|
||||
result = execute(connection, NANODBC_TEXT(query));
|
||||
}
|
||||
|
||||
|
||||
Block ODBCBlockInputStream::readImpl()
|
||||
{
|
||||
if (iterator == result.end())
|
||||
return {};
|
||||
|
||||
MutableColumns columns(description.sample_block.columns());
|
||||
for (const auto i : ext::range(0, columns.size()))
|
||||
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
|
||||
if (finished)
|
||||
return Block();
|
||||
|
||||
MutableColumns columns(description.sample_block.cloneEmptyColumns());
|
||||
size_t num_rows = 0;
|
||||
while (iterator != result.end())
|
||||
{
|
||||
Poco::Data::Row & row = *iterator;
|
||||
|
||||
for (const auto idx : ext::range(0, row.fieldCount()))
|
||||
while (true)
|
||||
{
|
||||
/// TODO This is extremely slow.
|
||||
const Poco::Dynamic::Var & value = row[idx];
|
||||
if (!result.next())
|
||||
{
|
||||
finished = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!value.isEmpty())
|
||||
for (int idx = 0; idx < result.columns(); ++idx)
|
||||
{
|
||||
if (description.types[idx].second)
|
||||
const auto & sample = description.sample_block.getByPosition(idx);
|
||||
|
||||
if (!result.is_null(idx))
|
||||
{
|
||||
bool is_nullable = description.types[idx].second;
|
||||
|
||||
if (is_nullable)
|
||||
{
|
||||
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
|
||||
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
|
||||
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
|
||||
insertValue(column_nullable.getNestedColumn(), data_type.getNestedType(), description.types[idx].first, result, idx);
|
||||
column_nullable.getNullMapData().emplace_back(0);
|
||||
}
|
||||
else
|
||||
insertValue(*columns[idx], description.types[idx].first, value);
|
||||
{
|
||||
insertValue(*columns[idx], sample.type, description.types[idx].first, result, idx);
|
||||
}
|
||||
}
|
||||
else
|
||||
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
|
||||
insertDefaultValue(*columns[idx], *sample.column);
|
||||
}
|
||||
|
||||
++iterator;
|
||||
|
||||
++num_rows;
|
||||
if (num_rows == max_block_size)
|
||||
if (++num_rows == max_block_size)
|
||||
break;
|
||||
}
|
||||
|
||||
return description.sample_block.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
|
||||
|
||||
void ODBCBlockInputStream::insertValue(
|
||||
IColumn & column, const DataTypePtr data_type, const ValueType type, nanodbc::result & row, size_t idx)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ValueType::vtUInt8:
|
||||
assert_cast<ColumnUInt8 &>(column).insertValue(row.get<uint16_t>(idx));
|
||||
break;
|
||||
case ValueType::vtUInt16:
|
||||
assert_cast<ColumnUInt16 &>(column).insertValue(row.get<uint16_t>(idx));
|
||||
break;
|
||||
case ValueType::vtUInt32:
|
||||
assert_cast<ColumnUInt32 &>(column).insertValue(row.get<uint32_t>(idx));
|
||||
break;
|
||||
case ValueType::vtUInt64:
|
||||
assert_cast<ColumnUInt64 &>(column).insertValue(row.get<uint64_t>(idx));
|
||||
break;
|
||||
case ValueType::vtInt8:
|
||||
assert_cast<ColumnInt8 &>(column).insertValue(row.get<int16_t>(idx));
|
||||
break;
|
||||
case ValueType::vtInt16:
|
||||
assert_cast<ColumnInt16 &>(column).insertValue(row.get<int16_t>(idx));
|
||||
break;
|
||||
case ValueType::vtInt32:
|
||||
assert_cast<ColumnInt32 &>(column).insertValue(row.get<int32_t>(idx));
|
||||
break;
|
||||
case ValueType::vtInt64:
|
||||
assert_cast<ColumnInt64 &>(column).insertValue(row.get<int64_t>(idx));
|
||||
break;
|
||||
case ValueType::vtFloat32:
|
||||
assert_cast<ColumnFloat32 &>(column).insertValue(row.get<float>(idx));
|
||||
break;
|
||||
case ValueType::vtFloat64:
|
||||
assert_cast<ColumnFloat64 &>(column).insertValue(row.get<double>(idx));
|
||||
break;
|
||||
case ValueType::vtFixedString:[[fallthrough]];
|
||||
case ValueType::vtString:
|
||||
assert_cast<ColumnString &>(column).insert(row.get<std::string>(idx));
|
||||
break;
|
||||
case ValueType::vtUUID:
|
||||
{
|
||||
auto value = row.get<std::string>(idx);
|
||||
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
|
||||
break;
|
||||
}
|
||||
case ValueType::vtDate:
|
||||
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{row.get<std::string>(idx)}.getDayNum()});
|
||||
break;
|
||||
case ValueType::vtDateTime:
|
||||
{
|
||||
auto value = row.get<std::string>(idx);
|
||||
ReadBufferFromString in(value);
|
||||
time_t time = 0;
|
||||
readDateTimeText(time, in);
|
||||
if (time < 0)
|
||||
time = 0;
|
||||
assert_cast<ColumnUInt32 &>(column).insertValue(time);
|
||||
break;
|
||||
}
|
||||
case ValueType::vtDateTime64:[[fallthrough]];
|
||||
case ValueType::vtDecimal32: [[fallthrough]];
|
||||
case ValueType::vtDecimal64: [[fallthrough]];
|
||||
case ValueType::vtDecimal128: [[fallthrough]];
|
||||
case ValueType::vtDecimal256:
|
||||
{
|
||||
auto value = row.get<std::string>(idx);
|
||||
ReadBufferFromString istr(value);
|
||||
data_type->getDefaultSerialization()->deserializeWholeText(column, istr, FormatSettings{});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,10 +3,8 @@
|
||||
#include <string>
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Poco/Data/RecordSet.h>
|
||||
#include <Poco/Data/Session.h>
|
||||
#include <Poco/Data/Statement.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -15,25 +13,33 @@ namespace DB
|
||||
class ODBCBlockInputStream final : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
ODBCBlockInputStream(
|
||||
Poco::Data::Session && session_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_);
|
||||
ODBCBlockInputStream(nanodbc::connection & connection_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_);
|
||||
|
||||
String getName() const override { return "ODBC"; }
|
||||
|
||||
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
|
||||
|
||||
private:
|
||||
using QueryResult = std::shared_ptr<nanodbc::result>;
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
||||
Block readImpl() override;
|
||||
|
||||
Poco::Data::Session session;
|
||||
Poco::Data::Statement statement;
|
||||
Poco::Data::RecordSet result;
|
||||
Poco::Data::RecordSet::Iterator iterator;
|
||||
static void insertValue(IColumn & column, const DataTypePtr data_type, const ValueType type, nanodbc::result & row, size_t idx);
|
||||
|
||||
static void insertDefaultValue(IColumn & column, const IColumn & sample_column)
|
||||
{
|
||||
column.insertFrom(sample_column, 0);
|
||||
}
|
||||
|
||||
Poco::Logger * log;
|
||||
const UInt64 max_block_size;
|
||||
ExternalResultDescription description;
|
||||
|
||||
Poco::Logger * log;
|
||||
nanodbc::connection & connection;
|
||||
nanodbc::result result;
|
||||
String query;
|
||||
bool finished = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -8,16 +8,14 @@
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include "getIdentifierQuote.h"
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_TYPE;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
@ -40,69 +38,21 @@ namespace
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
std::string getQuestionMarks(size_t n)
|
||||
{
|
||||
std::string result = "(";
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
if (i > 0)
|
||||
result += ",";
|
||||
result += "?";
|
||||
}
|
||||
return result + ")";
|
||||
}
|
||||
|
||||
Poco::Dynamic::Var getVarFromField(const Field & field, const ValueType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ValueType::vtUInt8:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt16:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt32:
|
||||
return Poco::Dynamic::Var(static_cast<UInt64>(field.get<UInt64>())).convert<UInt64>();
|
||||
case ValueType::vtUInt64:
|
||||
return Poco::Dynamic::Var(field.get<UInt64>()).convert<UInt64>();
|
||||
case ValueType::vtInt8:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt16:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt32:
|
||||
return Poco::Dynamic::Var(static_cast<Int64>(field.get<Int64>())).convert<Int64>();
|
||||
case ValueType::vtInt64:
|
||||
return Poco::Dynamic::Var(field.get<Int64>()).convert<Int64>();
|
||||
case ValueType::vtFloat32:
|
||||
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
|
||||
case ValueType::vtFloat64:
|
||||
return Poco::Dynamic::Var(field.get<Float64>()).convert<Float64>();
|
||||
case ValueType::vtString:
|
||||
return Poco::Dynamic::Var(field.get<String>()).convert<String>();
|
||||
case ValueType::vtDate:
|
||||
return Poco::Dynamic::Var(LocalDate(DayNum(field.get<UInt64>())).toString()).convert<String>();
|
||||
case ValueType::vtDateTime:
|
||||
return Poco::Dynamic::Var(DateLUT::instance().timeToString(time_t(field.get<UInt64>()))).convert<String>();
|
||||
case ValueType::vtUUID:
|
||||
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
|
||||
default:
|
||||
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
|
||||
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
ODBCBlockOutputStream::ODBCBlockOutputStream(Poco::Data::Session && session_,
|
||||
ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::connection & connection_,
|
||||
const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_,
|
||||
const Block & sample_block_,
|
||||
ContextPtr local_context_,
|
||||
IdentifierQuotingStyle quoting_)
|
||||
: session(session_)
|
||||
: log(&Poco::Logger::get("ODBCBlockOutputStream"))
|
||||
, connection(connection_)
|
||||
, db_name(remote_database_name_)
|
||||
, table_name(remote_table_name_)
|
||||
, sample_block(sample_block_)
|
||||
, local_context(local_context_)
|
||||
, quoting(quoting_)
|
||||
, log(&Poco::Logger::get("ODBCBlockOutputStream"))
|
||||
{
|
||||
description.init(sample_block);
|
||||
}
|
||||
@ -114,28 +64,12 @@ Block ODBCBlockOutputStream::getHeader() const
|
||||
|
||||
void ODBCBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
ColumnsWithTypeAndName columns;
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
columns.push_back({block.getColumns()[i], sample_block.getDataTypes()[i], sample_block.getNames()[i]});
|
||||
WriteBufferFromOwnString values_buf;
|
||||
auto writer = FormatFactory::instance().getOutputStream("Values", values_buf, sample_block, local_context);
|
||||
writer->write(block);
|
||||
|
||||
std::vector<Poco::Dynamic::Var> row_to_insert(block.columns());
|
||||
Poco::Data::Statement statement(session << getInsertQuery(db_name, table_name, columns, quoting) + getQuestionMarks(block.columns()));
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
statement.addBind(Poco::Data::Keywords::use(row_to_insert[i]));
|
||||
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx)
|
||||
{
|
||||
Field val;
|
||||
columns[col_idx].column->get(i, val);
|
||||
if (val.isNull())
|
||||
row_to_insert[col_idx] = Poco::Dynamic::Var();
|
||||
else
|
||||
row_to_insert[col_idx] = getVarFromField(val, description.types[col_idx].first);
|
||||
}
|
||||
statement.execute();
|
||||
}
|
||||
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
|
||||
execute(connection, query);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,30 +2,41 @@
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Poco/Data/Session.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Parsers/IdentifierQuotingStyle.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ODBCBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
|
||||
public:
|
||||
ODBCBlockOutputStream(Poco::Data::Session && session_, const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_, const Block & sample_block_, IdentifierQuotingStyle quoting);
|
||||
ODBCBlockOutputStream(
|
||||
nanodbc::connection & connection_,
|
||||
const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_,
|
||||
const Block & sample_block_,
|
||||
ContextPtr local_context_,
|
||||
IdentifierQuotingStyle quoting);
|
||||
|
||||
Block getHeader() const override;
|
||||
void write(const Block & block) override;
|
||||
|
||||
private:
|
||||
Poco::Data::Session session;
|
||||
Poco::Logger * log;
|
||||
|
||||
nanodbc::connection & connection;
|
||||
std::string db_name;
|
||||
std::string table_name;
|
||||
Block sample_block;
|
||||
ContextPtr local_context;
|
||||
IdentifierQuotingStyle quoting;
|
||||
|
||||
ExternalResultDescription description;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
||||
|
82
programs/odbc-bridge/ODBCConnectionFactory.h
Normal file
82
programs/odbc-bridge/ODBCConnectionFactory.h
Normal file
@ -0,0 +1,82 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
#include <mutex>
|
||||
#include <Common/BorrowedObjectPool.h>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace nanodbc
|
||||
{
|
||||
|
||||
static constexpr inline auto ODBC_CONNECT_TIMEOUT = 100;
|
||||
|
||||
using ConnectionPtr = std::shared_ptr<nanodbc::connection>;
|
||||
using Pool = BorrowedObjectPool<ConnectionPtr>;
|
||||
using PoolPtr = std::shared_ptr<Pool>;
|
||||
|
||||
class ConnectionHolder
|
||||
{
|
||||
|
||||
public:
|
||||
ConnectionHolder(const std::string & connection_string_, PoolPtr pool_) : connection_string(connection_string_), pool(pool_) {}
|
||||
|
||||
~ConnectionHolder()
|
||||
{
|
||||
if (connection)
|
||||
pool->returnObject(std::move(connection));
|
||||
}
|
||||
|
||||
nanodbc::connection & operator*()
|
||||
{
|
||||
if (!connection)
|
||||
{
|
||||
pool->borrowObject(connection, [&]()
|
||||
{
|
||||
return std::make_shared<nanodbc::connection>(connection_string, ODBC_CONNECT_TIMEOUT);
|
||||
});
|
||||
}
|
||||
|
||||
return *connection;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string connection_string;
|
||||
PoolPtr pool;
|
||||
ConnectionPtr connection;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ODBCConnectionFactory final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static ODBCConnectionFactory & instance()
|
||||
{
|
||||
static ODBCConnectionFactory ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
nanodbc::ConnectionHolder get(const std::string & connection_string, size_t pool_size)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
if (!factory.count(connection_string))
|
||||
factory.emplace(std::make_pair(connection_string, std::make_shared<nanodbc::Pool>(pool_size)));
|
||||
|
||||
return nanodbc::ConnectionHolder(connection_string, factory[connection_string]);
|
||||
}
|
||||
|
||||
private:
|
||||
/// [connection_settings_string] -> [connection_pool]
|
||||
using PoolFactory = std::unordered_map<std::string, nanodbc::PoolPtr>;
|
||||
PoolFactory factory;
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
}
|
@ -2,33 +2,26 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <Server/HTTP/HTMLForm.h>
|
||||
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
# include <IO/WriteHelpers.h>
|
||||
# include <Poco/Data/ODBC/ODBCException.h>
|
||||
# include <Poco/Data/ODBC/SessionImpl.h>
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
# include <Poco/Net/HTTPServerRequest.h>
|
||||
# include <Poco/Net/HTTPServerResponse.h>
|
||||
# include <common/logger_useful.h>
|
||||
# include "validateODBCConnectionString.h"
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include "validateODBCConnectionString.h"
|
||||
#include "ODBCConnectionFactory.h"
|
||||
#include <sql.h>
|
||||
#include <sqlext.h>
|
||||
|
||||
# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
bool isSchemaAllowed(SQLHDBC hdbc)
|
||||
bool isSchemaAllowed(nanodbc::connection & connection)
|
||||
{
|
||||
SQLUINTEGER value;
|
||||
SQLSMALLINT value_length = sizeof(value);
|
||||
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_SCHEMA_USAGE, &value, sizeof(value), &value_length);
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
|
||||
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
|
||||
|
||||
return value != 0;
|
||||
uint32_t result = connection.get_info<uint32_t>(SQL_SCHEMA_USAGE);
|
||||
return result != 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,10 +48,12 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
|
||||
try
|
||||
{
|
||||
std::string connection_string = params.get("connection_string");
|
||||
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
|
||||
SQLHDBC hdbc = session.dbc().handle();
|
||||
|
||||
bool result = isSchemaAllowed(hdbc);
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
bool result = isSchemaAllowed(*connection);
|
||||
|
||||
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
|
||||
try
|
||||
|
@ -1,22 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
/// This handler establishes connection to database, and retrieves whether schema is allowed.
|
||||
class SchemaAllowedHandler : public HTTPRequestHandler
|
||||
class SchemaAllowedHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
SchemaAllowedHandler(size_t keep_alive_timeout_, ContextPtr)
|
||||
: log(&Poco::Logger::get("SchemaAllowedHandler")), keep_alive_timeout(keep_alive_timeout_)
|
||||
SchemaAllowedHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("SchemaAllowedHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -2,11 +2,10 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <Poco/Data/ODBC/ODBCException.h>
|
||||
# include <Poco/Data/ODBC/SessionImpl.h>
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
|
||||
# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC
|
||||
#include <common/logger_useful.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
#include <sql.h>
|
||||
#include <sqlext.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -17,33 +16,16 @@ namespace ErrorCodes
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
std::string getIdentifierQuote(SQLHDBC hdbc)
|
||||
|
||||
std::string getIdentifierQuote(nanodbc::connection & connection)
|
||||
{
|
||||
std::string identifier;
|
||||
|
||||
SQLSMALLINT t;
|
||||
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_IDENTIFIER_QUOTE_CHAR, nullptr, 0, &t);
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
|
||||
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
|
||||
|
||||
if (t > 0)
|
||||
{
|
||||
// I have no idea, why to add '2' here, got from: contrib/poco/Data/ODBC/src/ODBCStatementImpl.cpp:60 (SQL_DRIVER_NAME)
|
||||
identifier.resize(static_cast<std::size_t>(t) + 2);
|
||||
|
||||
if (POCO_SQL_ODBC_CLASS::Utility::isError(POCO_SQL_ODBC_CLASS::SQLGetInfo(
|
||||
hdbc, SQL_IDENTIFIER_QUOTE_CHAR, &identifier[0], SQLSMALLINT((identifier.length() - 1) * sizeof(identifier[0])), &t)))
|
||||
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);
|
||||
|
||||
identifier.resize(static_cast<std::size_t>(t));
|
||||
}
|
||||
return identifier;
|
||||
return connection.get_info<std::string>(SQL_IDENTIFIER_QUOTE_CHAR);
|
||||
}
|
||||
|
||||
IdentifierQuotingStyle getQuotingStyle(SQLHDBC hdbc)
|
||||
|
||||
IdentifierQuotingStyle getQuotingStyle(nanodbc::connection & connection)
|
||||
{
|
||||
auto identifier_quote = getIdentifierQuote(hdbc);
|
||||
auto identifier_quote = getIdentifierQuote(connection);
|
||||
if (identifier_quote.length() == 0)
|
||||
return IdentifierQuotingStyle::None;
|
||||
else if (identifier_quote[0] == '`')
|
||||
|
@ -2,20 +2,19 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
# include <Interpreters/Context.h>
|
||||
# include <Poco/Logger.h>
|
||||
# include <Poco/Net/HTTPRequestHandler.h>
|
||||
|
||||
# include <Poco/Data/ODBC/Utility.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Net/HTTPRequestHandler.h>
|
||||
#include <Parsers/IdentifierQuotingStyle.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string getIdentifierQuote(SQLHDBC hdbc);
|
||||
std::string getIdentifierQuote(nanodbc::connection & connection);
|
||||
|
||||
IdentifierQuotingStyle getQuotingStyle(SQLHDBC hdbc);
|
||||
IdentifierQuotingStyle getQuotingStyle(nanodbc::connection & connection);
|
||||
|
||||
}
|
||||
|
||||
|
@ -373,6 +373,7 @@ class IColumn;
|
||||
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
|
||||
M(Int64, postgresql_connection_pool_wait_timeout, -1, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
|
||||
M(UInt64, glob_expansion_max_elements, 1000, "Maximum number of allowed addresses (For external storages, table functions, etc).", 0) \
|
||||
M(UInt64, odbc_bridge_connection_pool_size, 16, "Connection pool size for each connection settings string in ODBC bridge.", 0) \
|
||||
\
|
||||
M(Seconds, distributed_replica_error_half_life, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD, "Time period reduces replica error counter by 2 times.", 0) \
|
||||
M(UInt64, distributed_replica_error_cap, DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT, "Max number of errors per replica, prevents piling up an incredible amount of errors if replica was offline for some time and allows it to be reconsidered in a shorter amount of time.", 0) \
|
||||
|
@ -6,6 +6,7 @@ import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||
from multiprocessing.dummy import Pool
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True,
|
||||
@ -269,7 +270,7 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
|
||||
node1.exec_in_container(["bash", "-c", "chmod a+rw /tmp"], privileged=True, user='root')
|
||||
node1.exec_in_container(["bash", "-c", "chmod a+rw {}".format(sqlite_db)], privileged=True, user='root')
|
||||
|
||||
node1.query("insert into table function odbc('DSN={};', '', 't3') values (200, 2, 7)".format(
|
||||
node1.query("insert into table function odbc('DSN={};ReadOnly=0', '', 't3') values (200, 2, 7)".format(
|
||||
node1.odbc_drivers["SQLite3"]["DSN"]))
|
||||
|
||||
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(200))") == "7\n" # new value
|
||||
@ -381,5 +382,126 @@ def test_odbc_postgres_date_data_type(started_cluster):
|
||||
expected = '1\t2020-12-01\n2\t2020-12-02\n3\t2020-12-03\n'
|
||||
result = node1.query('SELECT * FROM test_date');
|
||||
assert(result == expected)
|
||||
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_date")
|
||||
node1.query("DROP TABLE IF EXISTS test_date")
|
||||
|
||||
|
||||
def test_odbc_postgres_conversions(started_cluster):
|
||||
conn = get_postgres_conn()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute(
|
||||
'''CREATE TABLE IF NOT EXISTS clickhouse.test_types (
|
||||
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
|
||||
h timestamp)''')
|
||||
|
||||
node1.query('''
|
||||
INSERT INTO TABLE FUNCTION
|
||||
odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
|
||||
VALUES (-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12')''')
|
||||
|
||||
result = node1.query('''
|
||||
SELECT a, b, c, d, e, f, g, h
|
||||
FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')
|
||||
''')
|
||||
|
||||
assert(result == '-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\n')
|
||||
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
|
||||
|
||||
cursor.execute("""CREATE TABLE IF NOT EXISTS clickhouse.test_types (column1 Timestamp, column2 Numeric)""")
|
||||
|
||||
node1.query(
|
||||
'''
|
||||
CREATE TABLE test_types (column1 DateTime64, column2 Decimal(5, 1))
|
||||
ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_types')''')
|
||||
|
||||
node1.query(
|
||||
"""INSERT INTO test_types
|
||||
SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Europe/Moscow'), toDecimal32(1.1, 1)""")
|
||||
|
||||
expected = node1.query("SELECT toDateTime64('2019-01-01 00:00:00', 3, 'Europe/Moscow'), toDecimal32(1.1, 1)")
|
||||
result = node1.query("SELECT * FROM test_types")
|
||||
print(result)
|
||||
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_types")
|
||||
assert(result == expected)
|
||||
|
||||
|
||||
def test_odbc_cyrillic_with_varchar(started_cluster):
|
||||
conn = get_postgres_conn()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_cyrillic")
|
||||
cursor.execute("CREATE TABLE clickhouse.test_cyrillic (name varchar(11))")
|
||||
|
||||
node1.query('''
|
||||
CREATE TABLE test_cyrillic (name String)
|
||||
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic')''')
|
||||
|
||||
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('A-nice-word')")
|
||||
cursor.execute("INSERT INTO clickhouse.test_cyrillic VALUES ('Красивенько')")
|
||||
|
||||
result = node1.query(''' SELECT * FROM test_cyrillic ORDER BY name''')
|
||||
assert(result == 'A-nice-word\nКрасивенько\n')
|
||||
result = node1.query(''' SELECT name FROM odbc('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_cyrillic') ''')
|
||||
assert(result == 'A-nice-word\nКрасивенько\n')
|
||||
|
||||
|
||||
def test_many_connections(started_cluster):
|
||||
conn = get_postgres_conn()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('DROP TABLE IF EXISTS clickhouse.test_pg_table')
|
||||
cursor.execute('CREATE TABLE clickhouse.test_pg_table (key integer, value integer)')
|
||||
|
||||
node1.query('''
|
||||
DROP TABLE IF EXISTS test_pg_table;
|
||||
CREATE TABLE test_pg_table (key UInt32, value UInt32)
|
||||
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')''')
|
||||
|
||||
node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(10)")
|
||||
|
||||
query = "SELECT count() FROM ("
|
||||
for i in range (24):
|
||||
query += "SELECT key FROM {t} UNION ALL "
|
||||
query += "SELECT key FROM {t})"
|
||||
|
||||
assert node1.query(query.format(t='test_pg_table')) == '250\n'
|
||||
|
||||
|
||||
def test_concurrent_queries(started_cluster):
|
||||
conn = get_postgres_conn()
|
||||
cursor = conn.cursor()
|
||||
|
||||
node1.query('''
|
||||
DROP TABLE IF EXISTS test_pg_table;
|
||||
CREATE TABLE test_pg_table (key UInt32, value UInt32)
|
||||
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_pg_table')''')
|
||||
|
||||
cursor.execute('DROP TABLE IF EXISTS clickhouse.test_pg_table')
|
||||
cursor.execute('CREATE TABLE clickhouse.test_pg_table (key integer, value integer)')
|
||||
|
||||
def node_insert(_):
|
||||
for i in range(5):
|
||||
node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)", user='default')
|
||||
|
||||
busy_pool = Pool(5)
|
||||
p = busy_pool.map_async(node_insert, range(5))
|
||||
p.wait()
|
||||
result = node1.query("SELECT count() FROM test_pg_table", user='default')
|
||||
print(result)
|
||||
assert(int(result) == 5 * 5 * 1000)
|
||||
|
||||
def node_insert_select(_):
|
||||
for i in range(5):
|
||||
result = node1.query("INSERT INTO test_pg_table SELECT number, number FROM numbers(1000)", user='default')
|
||||
result = node1.query("SELECT * FROM test_pg_table LIMIT 100", user='default')
|
||||
|
||||
busy_pool = Pool(5)
|
||||
p = busy_pool.map_async(node_insert_select, range(5))
|
||||
p.wait()
|
||||
result = node1.query("SELECT count() FROM test_pg_table", user='default')
|
||||
print(result)
|
||||
assert(int(result) == 5 * 5 * 1000 * 2)
|
||||
|
||||
node1.query('DROP TABLE test_pg_table;')
|
||||
cursor.execute('DROP TABLE clickhouse.test_pg_table;')
|
||||
|
Loading…
Reference in New Issue
Block a user