Merge pull request #10242 from MovElb/movelb-postgresql-wire-protocol-impl

PostgreSQL wire protocol implementation
This commit is contained in:
alexey-milovidov 2020-06-21 14:39:22 +03:00 committed by GitHub
commit 4ee623ccac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1968 additions and 1 deletions

View File

@ -61,6 +61,8 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadFuzzer.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
@ -998,6 +1000,21 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString());
});
create_server("postgresql_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new PostgreSQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
});
/// Prometheus (if defined and not setup yet with http_port)
create_server("prometheus.port", [&](UInt16 port)
{

View File

@ -23,6 +23,7 @@
M(MySQLConnection, "Number of client connections using MySQL protocol") \
M(HTTPConnection, "Number of connections to HTTP server") \
M(InterserverConnection, "Number of connections from other replicas to fetch parts") \
M(PostgreSQLConnection, "Number of client connections using PostgreSQL protocol") \
M(OpenFileForRead, "Number of files open for reading") \
M(OpenFileForWrite, "Number of files open for writing") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
@ -59,7 +60,6 @@
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
namespace CurrentMetrics
{
#define M(NAME, DOCUMENTATION) extern const Metric NAME = __COUNTER__;

View File

@ -0,0 +1,50 @@
#include "PostgreSQLProtocol.h"
namespace DB::PostgreSQLProtocol::Messaging
{
ColumnTypeSpec convertTypeIndexToPostgresColumnTypeSpec(TypeIndex type_index)
{
switch (type_index)
{
case TypeIndex::Int8:
return {ColumnType::CHAR, 1};
case TypeIndex::UInt8:
case TypeIndex::Int16:
return {ColumnType::INT2, 2};
case TypeIndex::UInt16:
case TypeIndex::Int32:
return {ColumnType::INT4, 4};
case TypeIndex::UInt32:
case TypeIndex::Int64:
return {ColumnType::INT8, 8};
case TypeIndex::Float32:
return {ColumnType::FLOAT4, 4};
case TypeIndex::Float64:
return {ColumnType::FLOAT8, 8};
case TypeIndex::FixedString:
case TypeIndex::String:
return {ColumnType::VARCHAR, -1};
case TypeIndex::Date:
return {ColumnType::DATE, 4};
case TypeIndex::Decimal32:
case TypeIndex::Decimal64:
case TypeIndex::Decimal128:
return {ColumnType::NUMERIC, -1};
case TypeIndex::UUID:
return {ColumnType::UUID, 16};
default:
return {ColumnType::VARCHAR, -1};
}
}
}

View File

@ -0,0 +1,915 @@
#pragma once
#include <Access/AccessControlManager.h>
#include <Access/User.h>
#include <functional>
#include <Interpreters/Context.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <Poco/Format.h>
#include <Poco/RegularExpression.h>
#include <Poco/Net/StreamSocket.h>
#include "Types.h"
#include <unordered_map>
#include <utility>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_CLIENT;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_TYPE;
}
namespace PostgreSQLProtocol
{
namespace Messaging
{
enum class FrontMessageType : Int32
{
// first message types
CANCEL_REQUEST = 80877102,
SSL_REQUEST = 80877103,
GSSENC_REQUEST = 80877104,
// other front message types
PASSWORD_MESSAGE = 'p',
QUERY = 'Q',
TERMINATE = 'X',
PARSE = 'P',
BIND = 'B',
DESCRIBE = 'D',
SYNC = 'S',
FLUSH = 'H',
CLOSE = 'C',
};
enum class MessageType : Int32
{
// common
ERROR_RESPONSE = 0,
CANCEL_REQUEST = 1,
COMMAND_COMPLETE = 2,
NOTICE_RESPONSE = 3,
NOTIFICATION_RESPONSE = 4,
PARAMETER_STATUS = 5,
READY_FOR_QUERY = 6,
SYNC = 7,
TERMINATE = 8,
// start up and authentication
AUTHENTICATION_OK = 30,
AUTHENTICATION_KERBEROS_V5 = 31,
AUTHENTICATION_CLEARTEXT_PASSWORD = 32,
AUTHENTICATION_MD5_PASSWORD = 33,
AUTHENTICATION_SCM_CREDENTIAL = 34,
AUTHENTICATION_GSS = 35,
AUTHENTICATION_SSPI = 36,
AUTHENTICATION_GSS_CONTINUE = 37,
AUTHENTICATION_SASL = 38,
AUTHENTICATION_SASL_CONTINUE = 39,
AUTHENTICATION_SASL_FINAL = 40,
BACKEND_KEY_DATA = 41,
GSSENC_REQUEST = 42,
GSS_RESPONSE = 43,
NEGOTIATE_PROTOCOL_VERSION = 44,
PASSWORD_MESSAGE = 45,
SASL_INITIAL_RESPONSE = 46,
SASL_RESPONSE = 47,
SSL_REQUEST = 48,
STARTUP_MESSAGE = 49,
// simple query
DATA_ROW = 100,
EMPTY_QUERY_RESPONSE = 101,
ROW_DESCRIPTION = 102,
QUERY = 103,
// extended query
BIND = 120,
BIND_COMPLETE = 121,
CLOSE = 122,
CLOSE_COMPLETE = 123,
DESCRIBE = 124,
EXECUTE = 125,
FLUSH = 126,
NODATA = 127,
PARAMETER_DESCRIPTION = 128,
PARSE = 129,
PARSE_COMPLETE = 130,
PORTAL_SUSPENDED = 131,
// copy query
COPY_DATA = 171,
COPY_DONE = 172,
COPY_FAIL = 173,
COPY_IN_RESPONSE = 174,
COPY_OUT_RESPONSE = 175,
COPY_BOTH_RESPONSE = 176,
// function query (deprecated by the protocol)
FUNCTION_CALL = 190,
FUNCTION_CALL_RESPONSE = 191,
};
//// Column 'typelem' from 'pg_type' table. NB: not all types are compatible with PostgreSQL's ones
enum class ColumnType : Int32
{
CHAR = 18,
INT8 = 20,
INT2 = 21,
INT4 = 23,
FLOAT4 = 700,
FLOAT8 = 701,
VARCHAR = 1043,
DATE = 1082,
NUMERIC = 1700,
UUID = 2950,
};
class ColumnTypeSpec
{
public:
ColumnType type;
Int16 len;
ColumnTypeSpec(ColumnType type_, Int16 len_) : type(type_), len(len_) {}
};
ColumnTypeSpec convertTypeIndexToPostgresColumnTypeSpec(TypeIndex type_index);
class MessageTransport
{
private:
ReadBuffer * in;
WriteBuffer * out;
public:
MessageTransport(WriteBuffer * out_) : in(nullptr), out(out_) {}
MessageTransport(ReadBuffer * in_, WriteBuffer * out_): in(in_), out(out_) {}
template<typename TMessage>
std::unique_ptr<TMessage> receiveWithPayloadSize(Int32 payload_size)
{
std::unique_ptr<TMessage> message = std::make_unique<TMessage>(payload_size);
message->deserialize(*in);
return message;
}
template<typename TMessage>
std::unique_ptr<TMessage> receive()
{
std::unique_ptr<TMessage> message = std::make_unique<TMessage>();
message->deserialize(*in);
return message;
}
FrontMessageType receiveMessageType()
{
char type = 0;
in->read(type);
return static_cast<FrontMessageType>(type);
}
template<typename TMessage>
void send(TMessage & message, bool flush=false)
{
message.serialize(*out);
if (flush)
out->next();
}
template<typename TMessage>
void send(TMessage && message, bool flush=false)
{
send(message, flush);
}
void send(char message, bool flush=false)
{
out->write(message);
if (flush)
out->next();
}
void dropMessage()
{
Int32 size;
readBinaryBigEndian(size, *in);
in->ignore(size - 4);
}
void flush()
{
out->next();
}
};
/** Basic class for messages sent by client or server. */
class IMessage
{
public:
virtual MessageType getMessageType() const = 0;
virtual ~IMessage() = default;
};
class ISerializable
{
public:
/** Should be overridden for sending the message */
virtual void serialize(WriteBuffer & out) const = 0;
/** Size of the message in bytes including message length part (4 bytes) */
virtual Int32 size() const = 0;
virtual ~ISerializable() = default;
};
class FrontMessage : public IMessage
{
public:
/** Should be overridden for receiving the message
* NB: This method should not read the first byte, which means the type of the message
* (if type is provided for the message by the protocol).
*/
virtual void deserialize(ReadBuffer & in) = 0;
};
class BackendMessage : public IMessage, public ISerializable
{};
class FirstMessage : public FrontMessage
{
public:
Int32 payload_size;
FirstMessage() = delete;
FirstMessage(int payload_size_) : payload_size(payload_size_) {}
};
class CancelRequest : public FirstMessage
{
public:
Int32 process_id;
Int32 secret_key;
CancelRequest(int payload_size_) : FirstMessage(payload_size_) {}
void deserialize(ReadBuffer & in) override
{
readBinaryBigEndian(process_id, in);
readBinaryBigEndian(secret_key, in);
}
MessageType getMessageType() const override
{
return MessageType::CANCEL_REQUEST;
}
};
class ErrorOrNoticeResponse : BackendMessage
{
public:
enum Severity {ERROR = 0, FATAL = 1, PANIC = 2, WARNING = 3, NOTICE = 4, DEBUG = 5, INFO = 6, LOG = 7};
private:
Severity severity;
String sql_state;
String message;
String enum_to_string[8] = {"ERROR", "FATAL", "PANIC", "WARNING", "NOTICE", "DEBUG", "INFO", "LOG"};
char isErrorOrNotice() const
{
switch (severity)
{
case ERROR:
case FATAL:
case PANIC:
return 'E';
case WARNING:
case NOTICE:
case DEBUG:
case INFO:
case LOG:
return 'N';
}
throw Exception("Unknown severity type " + std::to_string(severity), ErrorCodes::UNKNOWN_TYPE);
}
public:
ErrorOrNoticeResponse(const Severity & severity_, const String & sql_state_, const String & message_)
: severity(severity_)
, sql_state(sql_state_)
, message(message_)
{}
void serialize(WriteBuffer & out) const override
{
out.write(isErrorOrNotice());
Int32 sz = size();
writeBinaryBigEndian(sz, out);
out.write('S');
writeNullTerminatedString(enum_to_string[severity], out);
out.write('C');
writeNullTerminatedString(sql_state, out);
out.write('M');
writeNullTerminatedString(message, out);
out.write(0);
}
Int32 size() const override
{
// message length part + (1 + sizes of other fields + 1) + null byte in the end of the message
return 4 + (1 + enum_to_string[severity].size() + 1) + (1 + sql_state.size() + 1) + (1 + message.size() + 1) + 1;
}
MessageType getMessageType() const override
{
if (isErrorOrNotice() == 'E')
return MessageType::ERROR_RESPONSE;
return MessageType::NOTICE_RESPONSE;
}
};
class ReadyForQuery : BackendMessage
{
public:
void serialize(WriteBuffer &out) const override
{
out.write('Z');
writeBinaryBigEndian(size(), out);
// 'I' means that we are not in a transaction block. We use it here, because ClickHouse doesn't support transactions.
out.write('I');
}
Int32 size() const override
{
return 4 + 1;
}
MessageType getMessageType() const override
{
return MessageType::READY_FOR_QUERY;
}
};
class Terminate : FrontMessage
{
public:
void deserialize(ReadBuffer & in) override
{
in.ignore(4);
}
MessageType getMessageType() const override
{
return MessageType::TERMINATE;
}
};
class StartUpMessage : FirstMessage
{
public:
String user;
String database;
// includes username, may also include database and other runtime parameters
std::unordered_map<String, String> parameters;
StartUpMessage(Int32 payload_size_) : FirstMessage(payload_size_) {}
void deserialize(ReadBuffer & in) override
{
Int32 ps = payload_size - 1;
while (ps > 0)
{
String parameter_name;
String parameter_value;
readNullTerminated(parameter_name, in);
readNullTerminated(parameter_value, in);
ps -= parameter_name.size() + 1;
ps -= parameter_value.size() + 1;
if (parameter_name == "user")
{
user = parameter_value;
}
else if (parameter_name == "database")
{
database = parameter_value;
}
parameters.insert({std::move(parameter_name), std::move(parameter_value)});
if (payload_size < 0)
{
throw Exception(
Poco::format(
"Size of payload is larger than one declared in the message of type %d.",
getMessageType()),
ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}
in.ignore();
}
MessageType getMessageType() const override
{
return MessageType::STARTUP_MESSAGE;
}
};
class AuthenticationCleartextPassword : public Messaging::BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('R');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(static_cast<Int32>(3), out); // specifies that a clear-text password is required (by protocol)
}
Int32 size() const override
{
// length of message + special int32
return 4 + 4;
}
MessageType getMessageType() const override
{
return MessageType::AUTHENTICATION_CLEARTEXT_PASSWORD;
}
};
class AuthenticationOk : BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('R');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(0, out); // specifies that the authentication was successful (by protocol)
}
Int32 size() const override
{
// length of message + special int32
return 4 + 4;
}
MessageType getMessageType() const override
{
return MessageType::AUTHENTICATION_OK;
}
};
class PasswordMessage : FrontMessage
{
public:
String password;
void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
readNullTerminated(password, in);
}
MessageType getMessageType() const override
{
return MessageType::PASSWORD_MESSAGE;
}
};
class ParameterStatus : BackendMessage
{
private:
String name;
String value;
public:
ParameterStatus(String name_, String value_)
: name(name_)
, value(value_)
{}
void serialize(WriteBuffer & out) const override
{
out.write('S');
writeBinaryBigEndian(size(), out);
writeNullTerminatedString(name, out);
writeNullTerminatedString(value, out);
}
Int32 size() const override
{
return 4 + name.size() + 1 + value.size() + 1;
}
MessageType getMessageType() const override
{
return MessageType::PARAMETER_STATUS;
}
};
class BackendKeyData : BackendMessage
{
private:
Int32 process_id;
Int32 secret_key;
public:
BackendKeyData(Int32 process_id_, Int32 secret_key_)
: process_id(process_id_)
, secret_key(secret_key_)
{}
void serialize(WriteBuffer & out) const override
{
out.write('K');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(process_id, out);
writeBinaryBigEndian(secret_key, out);
}
Int32 size() const override
{
return 4 + 4 + 4;
}
MessageType getMessageType() const override
{
return MessageType::BACKEND_KEY_DATA;
}
};
class Query : FrontMessage
{
public:
String query;
void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
readNullTerminated(query, in);
}
MessageType getMessageType() const override
{
return MessageType::QUERY;
}
};
class EmptyQueryResponse : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('I');
writeBinaryBigEndian(size(), out);
}
Int32 size() const override
{
return 4;
}
MessageType getMessageType() const override
{
return MessageType::EMPTY_QUERY_RESPONSE;
}
};
enum class FormatCode : Int16
{
TEXT = 0,
BINARY = 1,
};
class FieldDescription : ISerializable
{
private:
const String & name;
ColumnTypeSpec type_spec;
FormatCode format_code;
public:
FieldDescription(const String & name_, TypeIndex type_index, FormatCode format_code_ = FormatCode::TEXT)
: name(name_)
, type_spec(convertTypeIndexToPostgresColumnTypeSpec(type_index))
, format_code(format_code_)
{}
void serialize(WriteBuffer & out) const override
{
writeNullTerminatedString(name, out);
writeBinaryBigEndian(static_cast<Int32>(0), out);
writeBinaryBigEndian(static_cast<Int16>(0), out);
writeBinaryBigEndian(static_cast<Int32>(type_spec.type), out);
writeBinaryBigEndian(type_spec.len, out);
writeBinaryBigEndian(static_cast<Int32>(-1), out);
writeBinaryBigEndian(static_cast<Int16>(format_code), out);
}
Int32 size() const override
{
// size of name (C string)
// + object ID of the table (Int32 and always zero) + attribute number of the column (Int16 and always zero)
// + type object id (Int32) + data type size (Int16)
// + type modifier (Int32 and always -1) + format code (Int16)
return (name.size() + 1) + 4 + 2 + 4 + 2 + 4 + 2;
}
};
class RowDescription : BackendMessage
{
private:
const std::vector<FieldDescription> & fields_descr;
public:
RowDescription(const std::vector<FieldDescription> & fields_descr_) : fields_descr(fields_descr_) {}
void serialize(WriteBuffer & out) const override
{
out.write('T');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(static_cast<Int16>(fields_descr.size()), out);
for (const FieldDescription & field : fields_descr)
field.serialize(out);
}
Int32 size() const override
{
Int32 sz = 4 + 2; // size of message + number of fields
for (const FieldDescription & field : fields_descr)
sz += field.size();
return sz;
}
MessageType getMessageType() const override
{
return MessageType::ROW_DESCRIPTION;
}
};
class StringField : public ISerializable
{
private:
String str;
public:
StringField(String str_) : str(str_) {}
void serialize(WriteBuffer & out) const override
{
writeString(str, out);
}
Int32 size() const override
{
return str.size();
}
};
class NullField : public ISerializable
{
public:
void serialize(WriteBuffer & /* out */) const override {}
Int32 size() const override
{
return -1;
}
};
class DataRow : BackendMessage
{
private:
const std::vector<std::shared_ptr<ISerializable>> & row;
public:
DataRow(const std::vector<std::shared_ptr<ISerializable>> & row_) : row(row_) {}
void serialize(WriteBuffer & out) const override
{
out.write('D');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(static_cast<Int16>(row.size()), out);
for (const std::shared_ptr<ISerializable> & field : row)
{
Int32 sz = field->size();
writeBinaryBigEndian(sz, out);
if (sz > 0)
field->serialize(out);
}
}
Int32 size() const override
{
Int32 sz = 4 + 2; // size of message + number of fields
for (const std::shared_ptr<ISerializable> & field : row)
sz += 4 + field->size();
return sz;
}
MessageType getMessageType() const override
{
return MessageType::DATA_ROW;
}
};
class CommandComplete : BackendMessage
{
public:
enum Command {BEGIN = 0, COMMIT = 1, INSERT = 2, DELETE = 3, UPDATE = 4, SELECT = 5, MOVE = 6, FETCH = 7, COPY = 8};
private:
String enum_to_string[9] = {"BEGIN", "COMMIT", "INSERT", "DELETE", "UPDATE", "SELECT", "MOVE", "FETCH", "COPY"};
String value;
public:
CommandComplete(Command cmd_, Int32 rows_count_)
{
value = enum_to_string[cmd_];
String add = " ";
if (cmd_ == Command::INSERT)
add = " 0 ";
value += add + std::to_string(rows_count_);
}
void serialize(WriteBuffer & out) const override
{
out.write('C');
writeBinaryBigEndian(size(), out);
writeNullTerminatedString(value, out);
}
Int32 size() const override
{
return 4 + value.size() + 1;
}
MessageType getMessageType() const override
{
return MessageType::COMMAND_COMPLETE;
}
static Command classifyQuery(const String & query)
{
std::vector<String> query_types({"BEGIN", "COMMIT", "INSERT", "DELETE", "UPDATE", "SELECT", "MOVE", "FETCH", "COPY"});
for (size_t i = 0; i != query_types.size(); ++i)
{
String::const_iterator iter = std::search(
query.begin(),
query.end(),
query_types[i].begin(),
query_types[i].end(),
[](char a, char b){return std::toupper(a) == b;});
if (iter != query.end())
return static_cast<Command>(i);
}
return Command::SELECT;
}
};
}
namespace PGAuthentication
{
class AuthenticationMethod
{
protected:
void setPassword(
const String & user_name,
const String & password,
Context & context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address)
{
try {
context.setUser(user_name, password, address);
}
catch (const Exception &)
{
mt.send(
Messaging::ErrorOrNoticeResponse(Messaging::ErrorOrNoticeResponse::ERROR, "28P01", "Invalid user or password"),
true);
throw;
}
}
public:
virtual void authenticate(
const String & user_name,
Context & context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address) = 0;
virtual Authentication::Type getType() const = 0;
virtual ~AuthenticationMethod() = default;
};
class NoPasswordAuth : public AuthenticationMethod
{
public:
void authenticate(
const String & /* user_name */,
Context & /* context */,
Messaging::MessageTransport & /* mt */,
const Poco::Net::SocketAddress & /* address */) override {}
Authentication::Type getType() const override
{
return Authentication::Type::NO_PASSWORD;
}
};
class CleartextPasswordAuth : public AuthenticationMethod
{
public:
void authenticate(
const String & user_name,
Context & context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address) override
{
mt.send(Messaging::AuthenticationCleartextPassword(), true);
Messaging::FrontMessageType type = mt.receiveMessageType();
if (type == Messaging::FrontMessageType::PASSWORD_MESSAGE)
{
std::unique_ptr<Messaging::PasswordMessage> password = mt.receive<Messaging::PasswordMessage>();
setPassword(user_name, password->password, context, mt, address);
}
else
throw Exception(
Poco::format(
"Client sent wrong message or closed the connection. Message byte was %d.",
static_cast<Int32>(type)),
ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
Authentication::Type getType() const override
{
return Authentication::Type::PLAINTEXT_PASSWORD;
}
};
class AuthenticationManager
{
private:
Poco::Logger * log = &Poco::Logger::get("AuthenticationManager");
std::unordered_map<Authentication::Type, std::shared_ptr<AuthenticationMethod>> type_to_method = {};
public:
AuthenticationManager(const std::vector<std::shared_ptr<AuthenticationMethod>> & auth_methods)
{
for (const std::shared_ptr<AuthenticationMethod> & method : auth_methods)
{
type_to_method[method->getType()] = method;
}
}
void authenticate(
const String & user_name,
Context & context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address)
{
auto user = context.getAccessControlManager().read<User>(user_name);
Authentication::Type user_auth_type = user->authentication.getType();
if (type_to_method.find(user_auth_type) != type_to_method.end())
{
type_to_method[user_auth_type]->authenticate(user_name, context, mt, address);
mt.send(Messaging::AuthenticationOk(), true);
LOG_INFO(log, "Authentication for user {} was successful.", user_name);
return;
}
mt.send(
Messaging::ErrorOrNoticeResponse(Messaging::ErrorOrNoticeResponse::ERROR, "0A000", "Authentication method is not supported"),
true);
throw Exception(Poco::format("Authentication type %d is not supported.", user_auth_type), ErrorCodes::NOT_IMPLEMENTED);
}
};
}
}
}

View File

@ -16,6 +16,7 @@ SRCS(
Field.cpp
iostream_debug_helpers.cpp
MySQLProtocol.cpp
PostgreSQLProtocol.cpp
NamesAndTypes.cpp
Settings.cpp
SettingsCollection.cpp

View File

@ -14,6 +14,7 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/PostgreSQLOutputFormat.h>
#include <Poco/URI.h>
#if !defined(ARCADIA_BUILD)
@ -394,6 +395,7 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorNull(*this);
registerOutputFormatProcessorMySQLWrite(*this);
registerOutputFormatProcessorMarkdown(*this);
registerOutputFormatProcessorPostgreSQLWrite(*this);
}
FormatFactory & FormatFactory::instance()

View File

@ -201,6 +201,7 @@ void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWrite(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorCapnProto(FormatFactory & factory);

View File

@ -998,4 +998,24 @@ inline String toString(const T & x)
return buf.str();
}
inline void writeNullTerminatedString(const String & s, WriteBuffer & buffer)
{
/// c_str is guaranteed to return zero-terminated string
buffer.write(s.с_str(), s.size() + 1);
}
template <typename T>
inline std::enable_if_t<is_arithmetic_v<T> && (sizeof(T) <= 8), void>
writeBinaryBigEndian(T x, WriteBuffer & buf) /// Assuming little endian architecture.
{
if constexpr (sizeof(x) == 2)
x = __builtin_bswap16(x);
else if constexpr (sizeof(x) == 4)
x = __builtin_bswap32(x);
else if constexpr (sizeof(x) == 8)
x = __builtin_bswap64(x);
writePODBinary(x, buf);
}
}

View File

@ -0,0 +1,79 @@
#include <Interpreters/ProcessList.h>
#include "PostgreSQLOutputFormat.h"
namespace DB
{
PostgreSQLOutputFormat::PostgreSQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IOutputFormat(header_, out_)
, format_settings(settings_)
, message_transport(&out)
{
}
void PostgreSQLOutputFormat::doWritePrefix()
{
if (initialized)
return;
initialized = true;
const auto & header = getPort(PortKind::Main).getHeader();
data_types = header.getDataTypes();
if (header.columns())
{
std::vector<PostgreSQLProtocol::Messaging::FieldDescription> columns;
columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); i++)
{
const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
columns.emplace_back(column_name, data_types[i]->getTypeId());
}
message_transport.send(PostgreSQLProtocol::Messaging::RowDescription(columns));
}
}
void PostgreSQLOutputFormat::consume(Chunk chunk)
{
doWritePrefix();
for (size_t i = 0; i != chunk.getNumRows(); ++i)
{
const Columns & columns = chunk.getColumns();
std::vector<std::shared_ptr<PostgreSQLProtocol::Messaging::ISerializable>> row;
row.reserve(chunk.getNumColumns());
for (size_t j = 0; j != chunk.getNumColumns(); ++j)
{
if (columns[j]->isNullAt(i))
row.push_back(std::make_shared<PostgreSQLProtocol::Messaging::NullField>());
else
{
WriteBufferFromOwnString ostr;
data_types[j]->serializeAsText(*columns[j], i, ostr, format_settings);
row.push_back(std::make_shared<PostgreSQLProtocol::Messaging::StringField>(std::move(ostr.str())));
}
}
message_transport.send(PostgreSQLProtocol::Messaging::DataRow(row));
}
}
void PostgreSQLOutputFormat::finalize() {}
void PostgreSQLOutputFormat::flush()
{
message_transport.flush();
}
void registerOutputFormatProcessorPostgreSQLWrite(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const FormatFactory::WriteCallback &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Processors/Formats/IRowOutputFormat.h>
#include <Core/Block.h>
#include <Core/PostgreSQLProtocol.h>
#include <Formats/FormatSettings.h>
namespace DB
{
//// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4
class PostgreSQLOutputFormat final : public IOutputFormat
{
public:
PostgreSQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
String getName() const override {return "PostgreSQLOutputFormat";}
void doWritePrefix() override;
void consume(Chunk) override;
void finalize() override;
void flush() override;
private:
bool initialized = false;
FormatSettings format_settings;
PostgreSQLProtocol::Messaging::MessageTransport message_transport;
DataTypes data_types;
};
}

View File

@ -38,6 +38,7 @@ SRCS(
Formats/Impl/NullFormat.cpp
Formats/Impl/ODBCDriver2BlockOutputFormat.cpp
Formats/Impl/ODBCDriverBlockOutputFormat.cpp
Formats/Impl/PostgreSQLOutputFormat.cpp
Formats/Impl/PrettyBlockOutputFormat.cpp
Formats/Impl/PrettyCompactBlockOutputFormat.cpp
Formats/Impl/PrettySpaceBlockOutputFormat.cpp

View File

@ -0,0 +1,306 @@
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <Interpreters/executeQuery.h>
#include "PostgreSQLHandler.h"
#include <Parsers/parseQuery.h>
#include <random>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
# include <Poco/Net/SSLManager.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
PostgreSQLHandler::PostgreSQLHandler(
const Poco::Net::StreamSocket & socket_,
IServer & server_,
bool ssl_enabled_,
Int32 connection_id_,
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, connection_context(server.context())
, ssl_enabled(ssl_enabled_)
, connection_id(connection_id_)
, authentication_manager(auth_methods_)
{
changeIO(socket());
}
void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket)
{
in = std::make_shared<ReadBufferFromPocoSocket>(socket);
out = std::make_shared<WriteBufferFromPocoSocket>(socket);
message_transport = std::make_shared<PostgreSQLProtocol::Messaging::MessageTransport>(in.get(), out.get());
}
void PostgreSQLHandler::run()
{
connection_context.makeSessionContext();
connection_context.setDefaultFormat("PostgreSQLWire");
try
{
if (!startUp())
return;
while (true)
{
message_transport->send(PostgreSQLProtocol::Messaging::ReadyForQuery(), true);
PostgreSQLProtocol::Messaging::FrontMessageType message_type = message_transport->receiveMessageType();
switch (message_type)
{
case PostgreSQLProtocol::Messaging::FrontMessageType::QUERY:
processQuery();
break;
case PostgreSQLProtocol::Messaging::FrontMessageType::TERMINATE:
LOG_INFO(log, "Client closed the connection");
return;
case PostgreSQLProtocol::Messaging::FrontMessageType::PARSE:
case PostgreSQLProtocol::Messaging::FrontMessageType::BIND:
case PostgreSQLProtocol::Messaging::FrontMessageType::DESCRIBE:
case PostgreSQLProtocol::Messaging::FrontMessageType::SYNC:
case PostgreSQLProtocol::Messaging::FrontMessageType::FLUSH:
case PostgreSQLProtocol::Messaging::FrontMessageType::CLOSE:
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR,
"0A000",
"ClickHouse doesn't support exteneded query mechanism"),
true);
LOG_ERROR(log, "Client tried to access via extended query protocol");
message_transport->dropMessage();
break;
default:
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR,
"0A000",
"Command is not supported"),
true);
LOG_ERROR(log, Poco::format("Command is not supported. Command code %d", static_cast<Int32>(message_type)));
message_transport->dropMessage();
}
}
}
catch (const Poco::Exception &exc)
{
log->log(exc);
}
}
bool PostgreSQLHandler::startUp()
{
Int32 payload_size;
Int32 info;
establishSecureConnection(payload_size, info);
if (static_cast<PostgreSQLProtocol::Messaging::FrontMessageType>(info) == PostgreSQLProtocol::Messaging::FrontMessageType::CANCEL_REQUEST)
{
LOG_INFO(log, "Client issued request canceling");
cancelRequest();
return false;
}
std::unique_ptr<PostgreSQLProtocol::Messaging::StartUpMessage> start_up_msg = receiveStartUpMessage(payload_size);
authentication_manager.authenticate(start_up_msg->user, connection_context, *message_transport, socket().peerAddress());
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<Int32> dis(0, INT32_MAX);
secret_key = dis(gen);
try
{
if (!start_up_msg->database.empty())
connection_context.setCurrentDatabase(start_up_msg->database);
connection_context.setCurrentQueryId(Poco::format("postgres:%d:%d", connection_id, secret_key));
}
catch (const Exception & exc)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "XX000", exc.message()),
true);
throw;
}
sendParameterStatusData(*start_up_msg);
message_transport->send(
PostgreSQLProtocol::Messaging::BackendKeyData(connection_id, secret_key), true);
LOG_INFO(log, "Successfully finished StartUp stage");
return true;
}
void PostgreSQLHandler::establishSecureConnection(Int32 & payload_size, Int32 & info)
{
bool was_encryption_req = true;
readBinaryBigEndian(payload_size, *in);
readBinaryBigEndian(info, *in);
switch (static_cast<PostgreSQLProtocol::Messaging::FrontMessageType>(info))
{
case PostgreSQLProtocol::Messaging::FrontMessageType::SSL_REQUEST:
LOG_INFO(log, "Client requested SSL");
if (ssl_enabled)
makeSecureConnectionSSL();
else
message_transport->send('N', true);
break;
case PostgreSQLProtocol::Messaging::FrontMessageType::GSSENC_REQUEST:
LOG_INFO(log, "Client requested GSSENC");
message_transport->send('N', true);
break;
default:
was_encryption_req = false;
}
if (was_encryption_req)
{
readBinaryBigEndian(payload_size, *in);
readBinaryBigEndian(info, *in);
}
}
#if USE_SSL
void PostgreSQLHandler::makeSecureConnectionSSL()
{
message_transport->send('S');
ss = std::make_shared<Poco::Net::SecureStreamSocket>(
Poco::Net::SecureStreamSocket::attach(socket(), Poco::Net::SSLManager::instance().defaultServerContext()));
changeIO(*ss);
}
#else
void PostgreSQLHandler::makeSecureConnectionSSL() {}
#endif
void PostgreSQLHandler::sendParameterStatusData(PostgreSQLProtocol::Messaging::StartUpMessage & start_up_message)
{
std::unordered_map<String, String> & parameters = start_up_message.parameters;
if (parameters.find("application_name") != parameters.end())
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("application_name", parameters["application_name"]));
if (parameters.find("client_encoding") != parameters.end())
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("client_encoding", parameters["client_encoding"]));
else
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("client_encoding", "UTF8"));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("server_version", VERSION_STRING));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("server_encoding", "UTF8"));
message_transport->send(PostgreSQLProtocol::Messaging::ParameterStatus("DateStyle", "ISO"));
message_transport->flush();
}
void PostgreSQLHandler::cancelRequest()
{
connection_context.setCurrentQueryId("");
connection_context.setDefaultFormat("Null");
std::unique_ptr<PostgreSQLProtocol::Messaging::CancelRequest> msg =
message_transport->receiveWithPayloadSize<PostgreSQLProtocol::Messaging::CancelRequest>(8);
String query = Poco::format("KILL QUERY WHERE query_id = 'postgres:%d:%d'", msg->process_id, msg->secret_key);
ReadBufferFromString replacement(query);
executeQuery(
replacement, *out, true, connection_context,
[](const String &, const String &, const String &, const String &) {}
);
}
inline std::unique_ptr<PostgreSQLProtocol::Messaging::StartUpMessage> PostgreSQLHandler::receiveStartUpMessage(int payload_size)
{
std::unique_ptr<PostgreSQLProtocol::Messaging::StartUpMessage> message;
try
{
message = message_transport->receiveWithPayloadSize<PostgreSQLProtocol::Messaging::StartUpMessage>(payload_size - 8);
}
catch (const Exception &)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "08P01", "Can't correctly handle StartUp message"),
true);
throw;
}
LOG_INFO(log, "Successfully received StartUp message");
return message;
}
void PostgreSQLHandler::processQuery()
{
try
{
std::unique_ptr<PostgreSQLProtocol::Messaging::Query> query =
message_transport->receive<PostgreSQLProtocol::Messaging::Query>();
if (isEmptyQuery(query->query))
{
message_transport->send(PostgreSQLProtocol::Messaging::EmptyQueryResponse());
return;
}
bool psycopg2_cond = query->query == "BEGIN" || query->query == "COMMIT"; // psycopg2 starts and ends queries with BEGIN/COMMIT commands
bool jdbc_cond = query->query.find("SET extra_float_digits") != String::npos || query->query.find("SET application_name") != String::npos; // jdbc starts with setting this parameter
if (psycopg2_cond || jdbc_cond)
{
message_transport->send(
PostgreSQLProtocol::Messaging::CommandComplete(
PostgreSQLProtocol::Messaging::CommandComplete::classifyQuery(query->query), 0));
return;
}
const auto & settings = connection_context.getSettingsRef();
std::vector<String> queries;
auto parse_res = splitMultipartQuery(query->query, queries, settings.max_query_size, settings.max_parser_depth);
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
for (const auto & spl_query : queries)
{
ReadBufferFromString read_buf(spl_query);
executeQuery(read_buf, *out, true, connection_context, {});
PostgreSQLProtocol::Messaging::CommandComplete::Command command =
PostgreSQLProtocol::Messaging::CommandComplete::classifyQuery(spl_query);
message_transport->send(PostgreSQLProtocol::Messaging::CommandComplete(command, 0), true);
}
}
catch (const Exception & e)
{
message_transport->send(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse(
PostgreSQLProtocol::Messaging::ErrorOrNoticeResponse::ERROR, "2F000", "Query execution failed.\n" + e.displayText()),
true);
throw;
}
}
bool PostgreSQLHandler::isEmptyQuery(const String & query)
{
if (query.empty())
return true;
Poco::RegularExpression regex(R"(\A\s*\z)");
return regex.match(query);
}
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <Core/PostgreSQLProtocol.h>
#include <Poco/Net/TCPServerConnection.h>
#include <common/logger_useful.h>
#include "IServer.h"
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
#endif
namespace CurrentMetrics
{
extern const Metric PostgreSQLConnection;
}
namespace DB
{
/** PostgreSQL wire protocol implementation.
* For more info see https://www.postgresql.org/docs/current/protocol.html
*/
class PostgreSQLHandler : public Poco::Net::TCPServerConnection
{
public:
PostgreSQLHandler(
const Poco::Net::StreamSocket & socket_,
IServer & server_,
bool ssl_enabled_,
Int32 connection_id_,
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_);
void run() final;
private:
Poco::Logger * log = &Poco::Logger::get("PostgreSQLHandler");
IServer & server;
Context connection_context;
bool ssl_enabled;
Int32 connection_id;
Int32 secret_key;
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
std::shared_ptr<PostgreSQLProtocol::Messaging::MessageTransport> message_transport;
#if USE_SSL
std::shared_ptr<Poco::Net::SecureStreamSocket> ss;
#endif
PostgreSQLProtocol::PGAuthentication::AuthenticationManager authentication_manager;
CurrentMetrics::Increment metric_increment{CurrentMetrics::PostgreSQLConnection};
void changeIO(Poco::Net::StreamSocket & socket);
bool startUp();
void establishSecureConnection(Int32 & payload_size, Int32 & info);
void makeSecureConnectionSSL();
void sendParameterStatusData(PostgreSQLProtocol::Messaging::StartUpMessage & start_up_message);
void cancelRequest();
std::unique_ptr<PostgreSQLProtocol::Messaging::StartUpMessage> receiveStartUpMessage(int payload_size);
void processQuery();
static bool isEmptyQuery(const String & query);
};
}

View File

@ -0,0 +1,27 @@
#include "PostgreSQLHandlerFactory.h"
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <memory>
#include <Server/PostgreSQLHandler.h>
namespace DB
{
PostgreSQLHandlerFactory::PostgreSQLHandlerFactory(IServer & server_)
: server(server_)
, log(&Poco::Logger::get("PostgreSQLHandlerFactory"))
{
auth_methods =
{
std::make_shared<PostgreSQLProtocol::PGAuthentication::NoPasswordAuth>(),
std::make_shared<PostgreSQLProtocol::PGAuthentication::CleartextPasswordAuth>(),
};
}
Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket)
{
Int32 connection_id = last_connection_id++;
LOG_TRACE(log, "PostgreSQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString());
return new PostgreSQLHandler(socket, server, ssl_enabled, connection_id, auth_methods);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <atomic>
#include <memory>
#include <Server/IServer.h>
#include <Core/PostgreSQLProtocol.h>
namespace DB
{
class PostgreSQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
IServer & server;
Poco::Logger * log;
#if USE_SSL
bool ssl_enabled = true;
#else
bool ssl_enabled = false;
#endif
std::atomic<Int32> last_connection_id = 0;
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> auth_methods;
public:
explicit PostgreSQLHandlerFactory(IServer & server_);
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override;
};
}

View File

@ -11,6 +11,8 @@ SRCS(
InterserverIOHTTPHandler.cpp
MySQLHandler.cpp
MySQLHandlerFactory.cpp
PostgreSQLHandler.cpp
PostgreSQLHandlerFactory.cpp
NotFoundHandler.cpp
PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp

View File

@ -0,0 +1,15 @@
33jdbcnull
44cknull
0
1
2
3
4
5
6
7
8
9
10
11
12

View File

@ -0,0 +1,18 @@
FROM ubuntu:18.04
RUN apt-get update && \
apt-get install -y software-properties-common build-essential openjdk-8-jdk curl
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
RUN apt-get clean
ARG ver=42.2.12
RUN curl -L -o /postgresql-java-${ver}.jar https://repo1.maven.org/maven2/org/postgresql/postgresql/${ver}/postgresql-${ver}.jar
ENV CLASSPATH=$CLASSPATH:/postgresql-java-${ver}.jar
WORKDIR /jdbc
COPY Test.java Test.java
RUN javac Test.java

View File

@ -0,0 +1,83 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
class JavaConnectorTest {
private static final String CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS default.test1 (`age` Int32, `name` String, `int_nullable` Nullable(Int32)) Engine = Memory";
private static final String INSERT_SQL = "INSERT INTO default.test1(`age`, `name`) VALUES(33, 'jdbc'),(44, 'ck')";
private static final String SELECT_SQL = "SELECT * FROM default.test1";
private static final String SELECT_NUMBER_SQL = "SELECT * FROM system.numbers LIMIT 13";
private static final String DROP_TABLE_SQL = "DROP TABLE default.test1";
public static void main(String[] args) {
int i = 0;
String host = "127.0.0.1";
String port = "5432";
String user = "default";
String password = "";
String database = "default";
while (i < args.length) {
switch (args[i]) {
case "--host":
host = args[++i];
break;
case "--port":
port = args[++i];
break;
case "--user":
user = args[++i];
break;
case "--password":
password = args[++i];
break;
case "--database":
database = args[++i];
break;
default:
i++;
break;
}
}
String jdbcUrl = String.format("jdbc:postgresql://%s:%s/%s", host, port, database);
Connection conn = null;
Statement stmt = null;
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("preferQueryMode", "simple");
props.setProperty("sslmode", "disable");
try {
conn = DriverManager.getConnection(jdbcUrl, props);
stmt = conn.createStatement();
stmt.executeUpdate(CREATE_TABLE_SQL);
stmt.executeUpdate(INSERT_SQL);
ResultSet rs = stmt.executeQuery(SELECT_SQL);
while (rs.next()) {
System.out.print(rs.getString("age"));
System.out.print(rs.getString("name"));
System.out.print(rs.getString("int_nullable"));
System.out.println();
}
stmt.executeUpdate(DROP_TABLE_SQL);
rs = stmt.executeQuery(SELECT_NUMBER_SQL);
while (rs.next()) {
System.out.print(rs.getString(1));
System.out.println();
}
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
}

View File

@ -0,0 +1,8 @@
version: '2.2'
services:
java:
build:
context: ./
network: host
# to keep container running
command: sleep infinity

View File

@ -0,0 +1,14 @@
version: '2.2'
services:
psql:
image: postgres:12.2-alpine
restart: always
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
ports:
- "5433:5433"
environment:
POSTGRES_HOST_AUTH_METHOD: "trust"

View File

@ -0,0 +1,35 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
<!-- openssl req -subj "/CN=localhost" -new -newkey rsa:2048 -days 365 -nodes -x509 -keyout /etc/clickhouse-server/server.key -out /etc/clickhouse-server/server.crt -->
<certificateFile>/etc/clickhouse-server/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/server.key</privateKeyFile>
<!-- openssl dhparam -out /etc/clickhouse-server/dhparam.pem 4096 -->
<dhParamsFile>/etc/clickhouse-server/dhparam.pem</dhParamsFile>
<verificationMode>none</verificationMode>
<loadDefaultCAFile>true</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<tcp_port>9000</tcp_port>
<postgresql_port>5433</postgresql_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,25 @@
-----BEGIN X9.42 DH PARAMETERS-----
MIIELAKCAgEAkX9p27H48x6pBuiT5i7yVvSXjMaCnGkViPCL/R6+FdSpv/MVs0WX
dBq1uQWjin2AL7T4uHOyhd1sD4MrzgzPGR5q7lJr6CjvRtqj5ZjBX/xbo/N4xeix
VL+UTCpvfPwwkve7UL6C4v79f7AIH34ie+Ew2H5Bvy8RraFL5zrfhDWjdMPVk+Kz
Y4+GAXKEzB6CaXzpXBv/s5w7vXO11+EIXgWn2z6lJ2rEkEdT7hCamzNGy+ajH8on
FIvxrvvEQ1oLcMYPu6OB6PEGxonAjTrwIwYth1+4lnG0A4X5Bn1Bx0DKEyCAZSHw
ByjDZ9ZCspqY4b/auRKRnWSWDLYPkW4YtNCVV/+5pJydcL511gQ2WQs7quZEsGem
4x14xpIM5qDvF3bzFuDpVMuuzlf6AB9dEMSms6iIwuWpSxck6AydII0okxUaxSlW
QJxZGQBE/2m9DwFmMHDBWUYBGvevX51RjQCsJsmgZPlwnY7hnZ29sB7MeVzqF26d
103byJBUq+rWUkxzKrYKbm+FjOz84/hv3ONxoxBI0FstKdaEr7PnpDLyLmZCheeL
tz0RzNM3h9AEno1SJNrzWaVI5s5L6QjM9QRRWfF2JB5QyhQjc++FgRGk3SDBbcW5
IhHVcboq/MppZiE82FSwMtCkmPvB7KrPoYx8fmrTs7zfHtx+glsMguMCggIAY32m
/EZbhpvmmbq0G/zjh1Nkdvj0IOQdkxnz7FhnKviKNgqWTbgHSaE+pcubK8XVsuAj
NLOp5AMpccV9h02ABGRdaSSyMeJxfnYRUhuSWHN+i/rqL3Xtv7w/BQXsUZd3tQQ+
I4UhnC/VUlGgndL5b8TbYOA/9CXPItGRMQb3S9SzijzEeKwWHu0n+j4Nwbl3nrtk
epWey/Wv0SU1d07es9vXxob/iPZSwM1E9SDjRFrqokLQCWFzaELzOF14TBXUn1RT
1agpxeux9UQpPS1ELjReh+c94BWQh5Soj/HJ2L76EgWkKM0se7uD6AhZee+b22YM
KKqbWWetStSjSSsLxR4yvPMct/eUS8V9UCQfPuY3DpLZi3+F5hAMcKqV3gGHJBrD
82MkQUj8eJaz3qEocG3zzYnxZ3sXze9HYpGCVIXX6b5p8yg9R1I8mNLo9w0IS2mU
5rmw2YdioZKUTN+jMVP79GFgsoGTPAf9sFDdswwD1ie1MYG/sw1K/Jxw3MPED4y5
we+bBaaa2WLaSB32eEnyxZBd8OOQOmTunp/zw12BAC485mF9Innr1fAhic8t+LOB
CyVAF02HA0puj365kGsZDjcXn+EEuwK+VeStERTXApcbwL+78VW+DQ1J/vBjkt4Z
ustnEMN3HdfV3DTBBRxmEj34MuEhrz0WjhgRskACIQCU5YbOgiW+L9L/mDwyGARK
jZ/2Z6yJuWyeim3EVpWG2Q==
-----END X9.42 DH PARAMETERS-----

View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+zCCAeOgAwIBAgIJANhP897Se2gmMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDAeFw0yMDA0MTgyMTE2NDBaFw0yMTA0MTgyMTE2NDBaMBQx
EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBAM92kcojQoMsjZ9YGhPMY6h/fDUsZeSKHLxgqE6wbmfU1oZKCPWqnvl+4n0J
pnT5h1ETxxYZLepimKq0DEVPUTmCl0xmcKbtUNiaTUKYKsdita6b2vZCX9wUPN9p
2Kjnm41l+aZNqIEBhIgHNWg9qowi20y0EIXR79jQLwwaInHAaJLZxVsqY2zjQ/D7
1Zh82MXud7iqxBQiEfw9Cz35UFA239R8QTlPkVQfsN1gfLxnLk24QUX3o+hbUI1g
nlSpyYDHYQlOmwz8doDs6THHAZNJ4bPE9xHNFpw6dGZdbtH+IKQ/qRZIiOaiNuzJ
IOHl6XQDRDkW2LMTiCQ6fjC7Pz8CAwEAAaNQME4wHQYDVR0OBBYEFFvhaA/Eguyf
BXkMj8BkNLBqMnz2MB8GA1UdIwQYMBaAFFvhaA/EguyfBXkMj8BkNLBqMnz2MAwG
A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBACeU/oL48eVAKH7NQntHhRaJ
ZGeQzKIjrSBjFo8BGXD1nJZhUeFsylLrhCkC8/5/3grE3BNVX9bxcGjO81C9Mn4U
t0z13d6ovJjCZSQArtLwgeJGlpH7gNdD3DyT8DQmrqYVnmnB7UmBu45XH1LWGQZr
FAOhGRVs6s6mNj8QlLMgdmsOeOQnsGCMdoss8zV9vO2dc4A5SDSSL2mqGGY4Yjtt
X+XlEhXXnksGyx8NGVOZX4wcj8WeCAj/lihQ7Zh6XYwZH9i+E46ompUwoziZnNPu
2RH63tLNCxkOY2HF5VMlbMmzer3FkhlM6TAZZRPcvSphKPwXK4A33yqc6wnWvpc=
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDPdpHKI0KDLI2f
WBoTzGOof3w1LGXkihy8YKhOsG5n1NaGSgj1qp75fuJ9CaZ0+YdRE8cWGS3qYpiq
tAxFT1E5gpdMZnCm7VDYmk1CmCrHYrWum9r2Ql/cFDzfadio55uNZfmmTaiBAYSI
BzVoPaqMIttMtBCF0e/Y0C8MGiJxwGiS2cVbKmNs40Pw+9WYfNjF7ne4qsQUIhH8
PQs9+VBQNt/UfEE5T5FUH7DdYHy8Zy5NuEFF96PoW1CNYJ5UqcmAx2EJTpsM/HaA
7OkxxwGTSeGzxPcRzRacOnRmXW7R/iCkP6kWSIjmojbsySDh5el0A0Q5FtizE4gk
On4wuz8/AgMBAAECggEAJ54J2yL+mZQRe2NUn4FBarTloDXZQ1pIgISov1Ybz0Iq
sTxEF728XAKp95y3J9Fa0NXJB+RJC2BGrRpy2W17IlNY1yMc0hOxg5t7s4LhcG/e
J/jlSG+GZL2MnlFVKXQJFWhq0yIzUmdayqstvLlB7z7cx/n+yb88YRfoVBRNjZEL
Tdrsw+087igDjrIxZJ3eMN5Wi434n9s4yAoRQC1bP5wcWx0gD4MzdmL8ip6suiRc
LRuBAhV/Op812xlxUhrF5dInUM9OLlGTXpUzexAS8Cyy7S4bfkW2BaCxTF7I7TFw
Whx28CKn/G49tIuU0m6AlxWbXpLVePTFyMb7RJz5cQKBgQD7VQd2u3HM6eE3PcXD
p6ObdLTUk8OAJ5BMmADFc71W0Epyo26/e8KXKGYGxE2W3fr13y+9b0fl5fxZPuhS
MgvXEO7rItAVsLcp0IzaqY0WUee2b4XWPAU0XuPqvjYMpx8H5OEHqFK6lhZysAqM
X7Ot3/Hux9X0MC4v5a/HNbDUOQKBgQDTUPaP3ADRrmpmE2sWuzWEnCSEz5f0tCLO
wTqhV/UraWUNlAbgK5NB790IjH/gotBSqqNPLJwJh0LUfClKM4LiaHsEag0OArOF
GhPMK1Ohps8c2RRsiG8+hxX2HEHeAVbkouEDPDiHdIW/92pBViDoETXL6qxDKbm9
LkOcVeDfNwKBgQChh1xsqrvQ/t+IKWNZA/zahH9TwEP9sW/ESkz0mhYuHWA7nV4o
ItpFW+l2n+Nd+vy32OFN1p9W2iD9GrklWpTRfEiRRqaFyjVt4mMkhaPvnGRXlAVo
Utrldbb1v5ntN9txr2ARE9VXpe53dzzQSxGnxi4vUK/paK3GitAWMCOdwQKBgQCi
hmGsUXQb0P6qVYMGr6PAw2re7t8baLRguoMCdqjs45nCMLh9D2apzvb8TTtJJU/+
VJlYGqJEPdDrpjcHh8jBo8QBqCM0RGWYGG9jl2syKB6hPGCV/PU6bSE58Y/DVNpk
7NUM7PM5UyhPddY2PC0A78Ole29UFLJzSzLa+b4DTwKBgH9Wh2k4YPnPcRrX89UL
eSwWa1CGq6HWX8Kd5qyz256aeHWuG5nv15+rBt+D7nwajUsqeVkAXz5H/dHuG1xz
jb7RW+pEjx0GVAmIbkM9vOLqEUfHHHPuk4AXCGGZ5sarPiKg4BHKBBsY1dpoO5UH
0j71fRA6zurHnTXDaCLWlUpZ
-----END PRIVATE KEY-----

View File

@ -0,0 +1,13 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password>123</password>
</default>
</users>
</yandex>

View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import datetime
import decimal
import docker
import psycopg2 as py_psql
import psycopg2.extras
import pytest
import os
import sys
import subprocess
import time
import uuid
from helpers.cluster import ClickHouseCluster
psycopg2.extras.register_uuid()
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', config_dir=config_dir, env_variables={'UBSAN_OPTIONS': 'print_stacktrace=1'})
server_port = 5433
@pytest.fixture(scope="module")
def server_address():
cluster.start()
try:
yield cluster.get_instance_ip('node')
finally:
cluster.shutdown()
@pytest.fixture(scope='module')
def psql_client():
docker_compose = os.path.join(SCRIPT_DIR, 'clients', 'psql', 'docker_compose.yml')
subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d', '--build'])
yield docker.from_env().containers.get(cluster.project_name + '_psql_1')
@pytest.fixture(scope='module')
def psql_server(psql_client):
"""Return PostgreSQL container when it is healthy."""
retries = 30
for i in range(retries):
info = psql_client.client.api.inspect_container(psql_client.name)
if info['State']['Health']['Status'] == 'healthy':
break
time.sleep(1)
else:
print(info['State'])
raise Exception('PostgreSQL server has not started after {} retries.'.format(retries))
return psql_client
@pytest.fixture(scope='module')
def java_container():
docker_compose = os.path.join(SCRIPT_DIR, 'clients', 'java', 'docker_compose.yml')
subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d', '--build'])
yield docker.from_env().containers.get(cluster.project_name + '_java_1')
def test_psql_is_ready(psql_server):
pass
def test_psql_client(psql_client, server_address):
cmd_prefix = 'psql "sslmode=require host={server_address} port={server_port} user=default dbname=default password=123" '\
.format(server_address=server_address, server_port=server_port)
cmd_prefix += "--no-align --field-separator=' ' "
code, (stdout, stderr) = psql_client.exec_run(cmd_prefix + '-c "SELECT 1 as a"', demux=True)
assert stdout == '\n'.join(['a', '1', '(1 row)', ''])
code, (stdout, stderr) = psql_client.exec_run(cmd_prefix + '''-c "SELECT 'колонка' as a"''', demux=True)
assert stdout == '\n'.join(['a', 'колонка', '(1 row)', ''])
code, (stdout, stderr) = psql_client.exec_run(
cmd_prefix + '-c ' +
'''
"CREATE DATABASE x;
USE x;
CREATE TABLE table1 (column UInt32) ENGINE = Memory;
INSERT INTO table1 VALUES (0), (1), (5);
INSERT INTO table1 VALUES (0), (1), (5);
SELECT * FROM table1 ORDER BY column;"
''',
demux=True
)
assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', '(6 rows)', ''])
code, (stdout, stderr) = psql_client.exec_run(
cmd_prefix + '-c ' +
'''
"DROP DATABASE x;
CREATE TEMPORARY TABLE tmp (tmp_column UInt32);
INSERT INTO tmp VALUES (0), (1);
SELECT * FROM tmp ORDER BY tmp_column;"
''',
demux=True
)
assert stdout == '\n'.join(['tmp_column', '0', '1', '(2 rows)', ''])
def test_python_client(server_address):
with pytest.raises(py_psql.InternalError) as exc_info:
ch = py_psql.connect(host=server_address, port=server_port, user='default', password='123', database='')
cur = ch.cursor()
cur.execute('select name from tables;')
assert exc_info.value.args == ("Query execution failed.\nDB::Exception: Table default.tables doesn't exist.\nSSL connection has been closed unexpectedly\n",)
ch = py_psql.connect(host=server_address, port=server_port, user='default', password='123', database='')
cur = ch.cursor()
cur.execute('select 1 as a, 2 as b')
assert (cur.description[0].name, cur.description[1].name) == ('a', 'b')
assert cur.fetchall() == [(1, 2)]
cur.execute('CREATE DATABASE x')
cur.execute('USE x')
cur.execute('CREATE TEMPORARY TABLE tmp2 (ch Int8, i64 Int64, f64 Float64, str String, date Date, dec Decimal(19, 10), uuid UUID) ENGINE = Memory')
cur.execute("insert into tmp2 (ch, i64, f64, str, date, dec, uuid) values (44, 534324234, 0.32423423, 'hello', '2019-01-23', 0.333333, '61f0c404-5cb3-11e7-907b-a6006ad3dba0')")
cur.execute('select * from tmp2')
assert cur.fetchall()[0] == ('44', 534324234, 0.32423423, 'hello', datetime.date(2019, 1, 23), decimal.Decimal('0.3333330000'), uuid.UUID('61f0c404-5cb3-11e7-907b-a6006ad3dba0'))
def test_java_client(server_address, java_container):
with open(os.path.join(SCRIPT_DIR, 'clients', 'java', '0.reference')) as fp:
reference = fp.read()
# database not exists exception.
code, (stdout, stderr) = java_container.exec_run('java JavaConnectorTest --host {host} --port {port} --user default --database '
'abc'.format(host=server_address, port=server_port), demux=True)
assert code == 1
# non-empty password passed.
code, (stdout, stderr) = java_container.exec_run('java JavaConnectorTest --host {host} --port {port} --user default --password 123 --database '
'default'.format(host=server_address, port=server_port), demux=True)
print(stdout, stderr, file=sys.stderr)
assert code == 0
assert stdout == reference