ClickHouse/src/Server/PostgreSQLHandler.cpp

325 lines
12 KiB
C++
Raw Normal View History

2020-05-30 17:04:02 +00:00
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromPocoSocket.h>
2021-08-01 14:12:34 +00:00
#include <Interpreters/Context.h>
2020-05-30 17:04:02 +00:00
#include <Interpreters/executeQuery.h>
#include "PostgreSQLHandler.h"
#include <Parsers/parseQuery.h>
#include <Server/TCPServer.h>
#include <Common/setThreadName.h>
2021-10-02 07:13:14 +00:00
#include <base/scope_guard.h>
2020-05-30 17:04:02 +00:00
#include <random>
2021-10-27 23:10:39 +00:00
#include <Common/config_version.h>
2020-05-30 17:04:02 +00:00
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
# include <Poco/Net/SSLManager.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
PostgreSQLHandler::PostgreSQLHandler(
const Poco::Net::StreamSocket & socket_,
IServer & server_,
TCPServer & tcp_server_,
2020-05-30 17:04:02 +00:00
bool ssl_enabled_,
Int32 connection_id_,
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
2020-05-30 17:04:02 +00:00
, ssl_enabled(ssl_enabled_)
, connection_id(connection_id_)
, authentication_manager(auth_methods_)
{
changeIO(socket());
}
void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket)
{
in = std::make_shared<ReadBufferFromPocoSocket>(socket);
out = std::make_shared<WriteBufferFromPocoSocket>(socket);
message_transport = std::make_shared<PostgreSQLProtocol::Messaging::MessageTransport>(in.get(), out.get());
}
void PostgreSQLHandler::run()
{
setThreadName("PostgresHandler");
ThreadStatus thread_status;
2021-08-01 14:12:34 +00:00
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::POSTGRESQL);
SCOPE_EXIT({ session.reset(); });
2020-05-30 17:04:02 +00:00
try
{
2021-08-01 14:12:34 +00:00
if (!startup())
2020-05-30 17:04:02 +00:00
return;
while (tcp_server.isOpen())
2020-05-30 17:04:02 +00:00
{
message_transport->send(PostgreSQLProtocol::Messaging::ReadyForQuery(), true);
constexpr size_t connection_check_timeout = 1; // 1 second
while (!in->poll(1000000 * connection_check_timeout))
if (!tcp_server.isOpen())
return;
2020-05-30 17:04:02 +00:00
PostgreSQLProtocol::Messaging::FrontMessageType message_type = message_transport->receiveMessageType();
if (!tcp_server.isOpen())
return;
2020-05-30 17:04:02 +00:00
switch (message_type)
{
case PostgreSQLProtocol::Messaging::FrontMessageType::QUERY:
2021-08-01 14:12:34 +00:00
processQuery();
2020-05-30 17:04:02 +00:00
break;
case PostgreSQLProtocol::Messaging::FrontMessageType::TERMINATE:
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Client closed the connection");
2020-05-30 17:04:02 +00:00
return;
case PostgreSQLProtocol::Messaging::FrontMessageType::PARSE:
case PostgreSQLProtocol::Messaging::FrontMessageType::BIND:
case PostgreSQLProtocol::Messaging::FrontMessageType::DESCRIBE:
case PostgreSQLProtocol::Messaging::FrontMessageType::SYNC:
case PostgreSQLProtocol::Messaging::FrontMessageType::FLUSH:
case PostgreSQLProtocol::Messaging::FrontMessageType::CLOSE:
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR,
"0A000",
2020-08-08 00:47:03 +00:00
"ClickHouse doesn't support extended query mechanism"),
2020-05-30 17:04:02 +00:00
true);
LOG_ERROR(log, "Client tried to access via extended query protocol");
message_transport->dropMessage();
break;
default:
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR,
"0A000",
"Command is not supported"),
true);
LOG_ERROR(log, "Command is not supported. Command code {:d}", static_cast<Int32>(message_type));
2020-05-30 17:04:02 +00:00
message_transport->dropMessage();
}
}
}
catch (const Poco::Exception &exc)
{
log->log(exc);
}
}
2021-08-01 14:12:34 +00:00
bool PostgreSQLHandler::startup()
2020-05-30 17:04:02 +00:00
{
Int32 payload_size;
Int32 info;
establishSecureConnection(payload_size, info);
if (static_cast<PostgreSQLProtocol::Messaging::FrontMessageType>(info) == PostgreSQLProtocol::Messaging::FrontMessageType::CANCEL_REQUEST)
{
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Client issued request canceling");
2021-08-01 14:12:34 +00:00
cancelRequest();
2020-05-30 17:04:02 +00:00
return false;
}
2020-06-21 12:16:08 +00:00
std::unique_ptr<PostgreSQLProtocol::Messaging::StartupMessage> start_up_msg = receiveStartupMessage(payload_size);
2021-08-01 14:12:34 +00:00
const auto & user_name = start_up_msg->user;
authentication_manager.authenticate(user_name, *session, *message_transport, socket().peerAddress());
2020-05-30 17:04:02 +00:00
try
{
2021-08-01 14:12:34 +00:00
session->makeSessionContext();
session->sessionContext()->setDefaultFormat("PostgreSQLWire");
2020-05-30 17:04:02 +00:00
if (!start_up_msg->database.empty())
2021-08-01 14:12:34 +00:00
session->sessionContext()->setCurrentDatabase(start_up_msg->database);
2020-05-30 17:04:02 +00:00
}
catch (const Exception & exc)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "XX000", exc.message()),
true);
throw;
}
sendParameterStatusData(*start_up_msg);
message_transport->send(
PostgreSQLProtocol::Messaging::BackendKeyData(connection_id, secret_key), true);
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Successfully finished Startup stage");
2020-05-30 17:04:02 +00:00
return true;
}
void PostgreSQLHandler::establishSecureConnection(Int32 & payload_size, Int32 & info)
{
bool was_encryption_req = true;
readBinaryBigEndian(payload_size, *in);
readBinaryBigEndian(info, *in);
switch (static_cast<PostgreSQLProtocol::Messaging::FrontMessageType>(info))
{
case PostgreSQLProtocol::Messaging::FrontMessageType::SSL_REQUEST:
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Client requested SSL");
2020-05-30 17:04:02 +00:00
if (ssl_enabled)
makeSecureConnectionSSL();
else
message_transport->send('N', true);
break;
case PostgreSQLProtocol::Messaging::FrontMessageType::GSSENC_REQUEST:
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Client requested GSSENC");
2020-05-30 17:04:02 +00:00
message_transport->send('N', true);
break;
default:
was_encryption_req = false;
}
if (was_encryption_req)
{
readBinaryBigEndian(payload_size, *in);
readBinaryBigEndian(info, *in);
}
}
#if USE_SSL
void PostgreSQLHandler::makeSecureConnectionSSL()
{
message_transport->send('S');
ss = std::make_shared<Poco::Net::SecureStreamSocket>(
Poco::Net::SecureStreamSocket::attach(socket(), Poco::Net::SSLManager::instance().defaultServerContext()));
changeIO(*ss);
}
#else
void PostgreSQLHandler::makeSecureConnectionSSL() {}
#endif
2020-06-21 12:16:08 +00:00
void PostgreSQLHandler::sendParameterStatusData(PostgreSQLProtocol::Messaging::StartupMessage & start_up_message)
2020-05-30 17:04:02 +00:00
{
std::unordered_map<String, String> & parameters = start_up_message.parameters;
if (parameters.find("application_name") != parameters.end())
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("application_name", parameters["application_name"]));
if (parameters.find("client_encoding") != parameters.end())
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("client_encoding", parameters["client_encoding"]));
else
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("client_encoding", "UTF8"));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("server_version", VERSION_STRING));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("server_encoding", "UTF8"));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("DateStyle", "ISO"));
message_transport->flush();
}
2021-08-01 14:12:34 +00:00
void PostgreSQLHandler::cancelRequest()
2020-05-30 17:04:02 +00:00
{
std::unique_ptr<PostgreSQLProtocol::Messaging::CancelRequest> msg =
message_transport->receiveWithPayloadSize<PostgreSQLProtocol::Messaging::CancelRequest>(8);
String query = fmt::format("KILL QUERY WHERE query_id = 'postgres:{:d}:{:d}'", msg->process_id, msg->secret_key);
2020-05-30 17:04:02 +00:00
ReadBufferFromString replacement(query);
2021-08-01 14:12:34 +00:00
auto query_context = session->makeQueryContext();
query_context->setCurrentQueryId("");
executeQuery(replacement, *out, true, query_context, {});
2020-05-30 17:04:02 +00:00
}
2020-06-21 12:16:08 +00:00
inline std::unique_ptr<PostgreSQLProtocol::Messaging::StartupMessage> PostgreSQLHandler::receiveStartupMessage(int payload_size)
2020-05-30 17:04:02 +00:00
{
2020-06-21 12:16:08 +00:00
std::unique_ptr<PostgreSQLProtocol::Messaging::StartupMessage> message;
2020-05-30 17:04:02 +00:00
try
{
2020-06-21 12:16:08 +00:00
message = message_transport->receiveWithPayloadSize<PostgreSQLProtocol::Messaging::StartupMessage>(payload_size - 8);
2020-05-30 17:04:02 +00:00
}
catch (const Exception &)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
2020-06-21 12:16:08 +00:00
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "08P01", "Can't correctly handle Startup message"),
2020-05-30 17:04:02 +00:00
true);
throw;
}
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Successfully received Startup message");
2020-05-30 17:04:02 +00:00
return message;
}
2021-08-01 14:12:34 +00:00
void PostgreSQLHandler::processQuery()
2020-05-30 17:04:02 +00:00
{
try
{
std::unique_ptr<PostgreSQLProtocol::Messaging::Query> query =
message_transport->receive<PostgreSQLProtocol::Messaging::Query>();
if (isEmptyQuery(query->query))
{
message_transport->send(PostgreSQLProtocol::Messaging::EmptyQueryResponse());
return;
}
bool psycopg2_cond = query->query == "BEGIN" || query->query == "COMMIT"; // psycopg2 starts and ends queries with BEGIN/COMMIT commands
bool jdbc_cond = query->query.find("SET extra_float_digits") != String::npos || query->query.find("SET application_name") != String::npos; // jdbc starts with setting this parameter
if (psycopg2_cond || jdbc_cond)
{
message_transport->send(
PostgreSQLProtocol::Messaging::CommandComplete(
PostgreSQLProtocol::Messaging::CommandComplete::classifyQuery(query->query), 0));
return;
}
2021-08-01 14:12:34 +00:00
const auto & settings = session->sessionContext()->getSettingsRef();
2020-05-30 17:04:02 +00:00
std::vector<String> queries;
auto parse_res = splitMultipartQuery(query->query, queries,
settings.max_query_size,
settings.max_parser_depth,
settings.allow_settings_after_format_in_insert);
2020-05-30 17:04:02 +00:00
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<Int32> dis(0, INT32_MAX);
2020-05-30 17:04:02 +00:00
for (const auto & spl_query : queries)
{
secret_key = dis(gen);
2021-08-01 14:12:34 +00:00
auto query_context = session->makeQueryContext();
query_context->setCurrentQueryId(fmt::format("postgres:{:d}:{:d}", connection_id, secret_key));
CurrentThread::QueryScope query_scope{query_context};
2020-05-30 17:04:02 +00:00
ReadBufferFromString read_buf(spl_query);
executeQuery(read_buf, *out, false, query_context, {});
2020-05-30 17:04:02 +00:00
PostgreSQLProtocol::Messaging::CommandComplete::Command command =
PostgreSQLProtocol::Messaging::CommandComplete::classifyQuery(spl_query);
message_transport->send(PostgreSQLProtocol::Messaging::CommandComplete(command, 0), true);
}
}
catch (const Exception & e)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "2F000", "Query execution failed.\n" + e.displayText()),
true);
throw;
}
}
bool PostgreSQLHandler::isEmptyQuery(const String & query)
{
if (query.empty())
return true;
Poco::RegularExpression regex(R"(\A\s*\z)");
return regex.match(query);
}
}