Merge branch 'mysql' of https://github.com/yurriy/ClickHouse into yurriy-mysql

This commit is contained in:
Alexey Milovidov 2019-05-25 17:14:57 +03:00
commit b94f2be154
25 changed files with 1867 additions and 4 deletions

View File

@ -8,6 +8,7 @@ set(CLICKHOUSE_SERVER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/RootRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp
${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandler.cpp
)
set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY})

View File

@ -0,0 +1,363 @@
#include <DataStreams/copyData.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <Interpreters/executeQuery.h>
#include <Storages/IStorage.h>
#include <Core/MySQLProtocol.h>
#include <Core/NamesAndTypes.h>
#include <Columns/ColumnVector.h>
#include <Common/config_version.h>
#include <Common/NetException.h>
#include <Poco/Crypto/RSAKey.h>
#include <Poco/Crypto/CipherFactory.h>
#include <Poco/Net/SecureStreamSocket.h>
#include <Poco/Net/SSLManager.h>
#include "MySQLHandler.h"
#include <limits>
namespace DB
{
using namespace MySQLProtocol;
using Poco::Net::SecureStreamSocket;
using Poco::Net::SSLManager;
namespace ErrorCodes
{
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
extern const int UNKNOWN_EXCEPTION;
}
uint32_t MySQLHandler::last_connection_id = 0;
void MySQLHandler::run()
{
connection_context = server.context();
connection_context.setDefaultFormat("MySQL");
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.sequence_id, "MySQLHandler");
try
{
String scramble = generateScramble();
/** Native authentication sent 20 bytes + '\0' character = 21 bytes.
* This plugin must do the same to stay consistent with historical behavior if it is set to operate as a default plugin.
* https://github.com/mysql/mysql-server/blob/8.0/sql/auth/sql_authentication.cc#L3994
*/
Handshake handshake(connection_id, VERSION_STRING, scramble + '\0');
packet_sender->sendPacket<Handshake>(handshake, true);
LOG_TRACE(log, "Sent handshake");
HandshakeResponse handshake_response = finishHandshake();
connection_context.client_capabilities = handshake_response.capability_flags;
if (handshake_response.max_packet_size)
connection_context.max_packet_size = handshake_response.max_packet_size;
if (!connection_context.max_packet_size)
connection_context.max_packet_size = MAX_PACKET_LENGTH;
LOG_DEBUG(log, "Capabilities: " << handshake_response.capability_flags
<< "\nmax_packet_size: "
<< handshake_response.max_packet_size
<< "\ncharacter_set: "
<< handshake_response.character_set
<< "\nuser: "
<< handshake_response.username
<< "\nauth_response length: "
<< handshake_response.auth_response.length()
<< "\nauth_response: "
<< handshake_response.auth_response
<< "\ndatabase: "
<< handshake_response.database
<< "\nauth_plugin_name: "
<< handshake_response.auth_plugin_name);
capabilities = handshake_response.capability_flags;
if (!(capabilities & CLIENT_PROTOCOL_41))
{
throw Exception("Required capability: CLIENT_PROTOCOL_41.", ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES);
}
if (!(capabilities & CLIENT_PLUGIN_AUTH))
{
throw Exception("Required capability: CLIENT_PLUGIN_AUTH.", ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES);
}
authenticate(handshake_response, scramble);
OK_Packet ok_packet(0, handshake_response.capability_flags, 0, 0, 0);
packet_sender->sendPacket(ok_packet, true);
while (true)
{
packet_sender->resetSequenceId();
String payload = packet_sender->receivePacketPayload();
int command = payload[0];
LOG_DEBUG(log, "Received command: " << std::to_string(command) << ". Connection id: " << connection_id << ".");
try
{
switch (command)
{
case COM_QUIT:
return;
case COM_INIT_DB:
comInitDB(payload);
break;
case COM_QUERY:
comQuery(payload);
break;
case COM_FIELD_LIST:
comFieldList(payload);
break;
case COM_PING:
comPing();
break;
default:
throw Exception(Poco::format("Command %d is not implemented.", command), ErrorCodes::NOT_IMPLEMENTED);
}
}
catch (const NetException & exc)
{
log->log(exc);
throw;
}
catch (const Exception & exc)
{
log->log(exc);
packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
}
}
}
catch (Poco::Exception & exc)
{
log->log(exc);
}
}
/** Reads 3 bytes, finds out whether it is SSLRequest or HandshakeResponse packet, starts secure connection, if it is SSLRequest.
* Reading is performed from socket instead of ReadBuffer to prevent reading part of SSL handshake.
* If we read it from socket, it will be impossible to start SSL connection using Poco. Size of SSLRequest packet payload is 32 bytes, thus we can read at most 36 bytes.
*/
MySQLProtocol::HandshakeResponse MySQLHandler::finishHandshake()
{
HandshakeResponse packet;
size_t packet_size = PACKET_HEADER_SIZE + SSL_REQUEST_PAYLOAD_SIZE;
/// Buffer for SSLRequest or part of HandshakeResponse.
char buf[packet_size];
size_t pos = 0;
/// Reads at least count and at most packet_size bytes.
auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void {
while (pos < count)
{
int ret = socket().receiveBytes(buf + pos, packet_size - pos);
if (ret == 0)
{
throw Exception("Cannot read all data. Bytes read: " + std::to_string(pos) + ". Bytes expected: 3.", ErrorCodes::CANNOT_READ_ALL_DATA);
}
pos += ret;
}
};
read_bytes(3); /// We can find out whether it is SSLRequest of HandshakeResponse by first 3 bytes.
size_t payload_size = *reinterpret_cast<uint32_t *>(buf) & 0xFFFFFFu;
LOG_TRACE(log, "payload size: " << payload_size);
if (payload_size == SSL_REQUEST_PAYLOAD_SIZE)
{
read_bytes(packet_size); /// Reading rest SSLRequest.
SSLRequest ssl_request;
ssl_request.readPayload(String(buf + PACKET_HEADER_SIZE, pos - PACKET_HEADER_SIZE));
connection_context.client_capabilities = ssl_request.capability_flags;
connection_context.max_packet_size = ssl_request.max_packet_size ? ssl_request.max_packet_size : MAX_PACKET_LENGTH;
secure_connection = true;
ss = std::make_shared<SecureStreamSocket>(SecureStreamSocket::attach(socket(), SSLManager::instance().defaultServerContext()));
in = std::make_shared<ReadBufferFromPocoSocket>(*ss);
out = std::make_shared<WriteBufferFromPocoSocket>(*ss);
connection_context.sequence_id = 2;
packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.sequence_id, "MySQLHandler");
packet_sender->max_packet_size = connection_context.max_packet_size;
packet_sender->receivePacket(packet); /// Reading HandshakeResponse from secure socket.
}
else
{
/// Reading rest of HandshakeResponse.
packet_size = PACKET_HEADER_SIZE + payload_size;
WriteBufferFromOwnString buf_for_handshake_response;
buf_for_handshake_response.write(buf, pos);
copyData(*packet_sender->in, buf_for_handshake_response, packet_size - pos);
packet.readPayload(buf_for_handshake_response.str().substr(PACKET_HEADER_SIZE));
packet_sender->sequence_id++;
}
return packet;
}
String MySQLHandler::generateScramble()
{
String scramble(MySQLProtocol::SCRAMBLE_LENGTH, 0);
Poco::RandomInputStream generator;
for (size_t i = 0; i < scramble.size(); i++)
{
generator >> scramble[i];
}
return scramble;
}
void MySQLHandler::authenticate(const HandshakeResponse & handshake_response, const String & scramble)
{
String auth_response;
AuthSwitchResponse response;
if (handshake_response.auth_plugin_name != Authentication::SHA256)
{
packet_sender->sendPacket(AuthSwitchRequest(Authentication::SHA256, scramble + '\0'), true);
packet_sender->receivePacket(response);
auth_response = response.value;
LOG_TRACE(log, "Authentication method mismatch.");
}
else
{
auth_response = handshake_response.auth_response;
LOG_TRACE(log, "Authentication method match.");
}
auto getOpenSSLError = []() -> String
{
BIO * mem = BIO_new(BIO_s_mem());
ERR_print_errors(mem);
char * buf = nullptr;
long size = BIO_get_mem_data(mem, &buf);
String errors_str(buf, size);
BIO_free(mem);
return errors_str;
};
if (auth_response == "\1")
{
LOG_TRACE(log, "Client requests public key.");
BIO * mem = BIO_new(BIO_s_mem());
if (PEM_write_bio_RSA_PUBKEY(mem, public_key) != 1)
{
LOG_TRACE(log, "OpenSSL error:\n" << getOpenSSLError());
throw Exception("Failed to write public key to memory.", ErrorCodes::UNKNOWN_EXCEPTION);
}
char * pem_buf = nullptr;
long pem_size = BIO_get_mem_data(mem, &pem_buf);
String pem(pem_buf, pem_size);
BIO_free(mem);
LOG_TRACE(log, "Key: " << pem);
AuthMoreData data(pem);
packet_sender->sendPacket(data, true);
packet_sender->receivePacket(response);
auth_response = response.value;
}
else
{
LOG_TRACE(log, "Client didn't request public key.");
}
String password;
/** Decrypt password, if it's not empty.
* The original intention was that the password is a string[NUL] but this never got enforced properly so now we have to accept that
* an empty packet is a blank password, thus the check for auth_response.empty() has to be made too.
* https://github.com/mysql/mysql-server/blob/8.0/sql/auth/sql_authentication.cc#L4017
*/
if (!secure_connection && (!auth_response.empty() && auth_response != "\0"))
{
LOG_TRACE(log, "Received nonempty password");
auto ciphertext = reinterpret_cast<unsigned char *>(auth_response.data());
unsigned char plaintext[RSA_size(private_key)];
int plaintext_size = RSA_private_decrypt(auth_response.size(), ciphertext, plaintext, private_key, RSA_PKCS1_OAEP_PADDING);
if (plaintext_size == -1)
{
LOG_TRACE(log, "OpenSSL error:\n" << getOpenSSLError());
throw Exception("Failed to decrypt.", ErrorCodes::UNKNOWN_EXCEPTION);
}
password.resize(plaintext_size);
for (int i = 0; i < plaintext_size; i++)
{
password[i] = plaintext[i] ^ static_cast<unsigned char>(scramble[i % scramble.size()]);
}
}
else if (secure_connection)
{
password = auth_response;
}
else
{
LOG_TRACE(log, "Received empty password");
}
if (!password.empty())
{
password.pop_back(); /// terminating null byte
}
try
{
connection_context.setUser(handshake_response.username, password, socket().address(), "");
connection_context.setCurrentDatabase(handshake_response.database);
connection_context.setCurrentQueryId("");
LOG_ERROR(log, "Authentication for user " << handshake_response.username << " succeeded.");
}
catch (const Exception & exc)
{
LOG_ERROR(log, "Authentication for user " << handshake_response.username << " failed.");
packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
throw;
}
}
void MySQLHandler::comInitDB(const String & payload)
{
String database = payload.substr(1);
LOG_DEBUG(log, "Setting current database to " << database);
connection_context.setCurrentDatabase(database);
packet_sender->sendPacket(OK_Packet(0, capabilities, 0, 0, 1), true);
}
void MySQLHandler::comFieldList(const String & payload)
{
ComFieldList packet;
packet.readPayload(payload);
String database = connection_context.getCurrentDatabase();
StoragePtr tablePtr = connection_context.getTable(database, packet.table);
for (const NameAndTypePair & column: tablePtr->getColumns().getAll())
{
ColumnDefinition column_definition(
database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0
);
packet_sender->sendPacket(column_definition);
}
packet_sender->sendPacket(OK_Packet(0xfe, capabilities, 0, 0, 0), true);
}
void MySQLHandler::comPing()
{
packet_sender->sendPacket(OK_Packet(0x0, capabilities, 0, 0, 0), true);
}
void MySQLHandler::comQuery(const String & payload)
{
bool with_output = false;
std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void {
with_output = true;
};
ReadBufferFromMemory query(payload.data() + 1, payload.size() - 1);
executeQuery(query, *out, true, connection_context, set_content_type, nullptr);
if (!with_output)
packet_sender->sendPacket(OK_Packet(0x00, capabilities, 0, 0, 0), true);
}
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <Poco/Net/SecureStreamSocket.h>
#include <Common/getFQDNOrHostName.h>
#include <Core/MySQLProtocol.h>
#include <openssl/evp.h>
#include "IServer.h"
namespace DB
{
/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client.
class MySQLHandler : public Poco::Net::TCPServerConnection
{
public:
MySQLHandler(
IServer & server_,
const Poco::Net::StreamSocket & socket_,
RSA * public_key,
RSA * private_key)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("MySQLHandler"))
, connection_context(server.context())
, connection_id(last_connection_id++)
, public_key(public_key)
, private_key(private_key)
{
log->setLevel("information");
}
void run() final;
private:
/// Enables SSL, if client requested.
MySQLProtocol::HandshakeResponse finishHandshake();
void comQuery(const String & payload);
void comFieldList(const String & payload);
void comPing();
void comInitDB(const String & payload);
static String generateScramble();
void authenticate(const MySQLProtocol::HandshakeResponse &, const String & scramble);
IServer & server;
Poco::Logger * log;
Context connection_context;
std::shared_ptr<MySQLProtocol::PacketSender> packet_sender;
uint32_t connection_id = 0;
uint32_t capabilities;
static uint32_t last_connection_id;
RSA * public_key, * private_key;
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
bool secure_connection = false;
std::shared_ptr<Poco::Net::SecureStreamSocket> ss;
};
}

View File

@ -0,0 +1,113 @@
#pragma once
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/SSLManager.h>
#include <Poco/Crypto/X509Certificate.h>
#include <common/logger_useful.h>
#include "IServer.h"
#include "MySQLHandler.h"
namespace Poco
{
class Logger;
}
namespace DB
{
class MySQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
IServer & server;
Poco::Logger * log;
RSA * public_key = nullptr, * private_key = nullptr;
public:
explicit MySQLHandlerFactory(IServer & server_)
: server(server_), log(&Logger::get("MySQLHandlerFactory"))
{
/// Reading rsa keys for SHA256 authentication plugin.
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
String certificateFileProperty = "openSSL.server.certificateFile";
String privateKeyFileProperty = "openSSL.server.privateKeyFile";
if (!config.has(certificateFileProperty))
{
LOG_INFO(log, "Certificate file is not set.");
generateRSAKeys();
return;
}
if (!config.has(privateKeyFileProperty))
{
LOG_INFO(log, "Private key file is not set.");
generateRSAKeys();
return;
}
String certificateFile = config.getString(certificateFileProperty);
FILE * fp = fopen(certificateFile.data(), "r");
if (fp == nullptr)
{
LOG_WARNING(log, "Cannot open certificate file: " << certificateFile << ".");
generateRSAKeys();
return;
}
X509 * x509 = PEM_read_X509(fp, nullptr, nullptr, nullptr);
EVP_PKEY * p = X509_get_pubkey(x509);
public_key = EVP_PKEY_get1_RSA(p);
X509_free(x509);
EVP_PKEY_free(p);
fclose(fp);
String privateKeyFile = config.getString(privateKeyFileProperty);
fp = fopen(privateKeyFile.data(), "r");
if (fp == nullptr)
{
LOG_WARNING(log, "Cannot open private key file " << privateKeyFile << ".");
generateRSAKeys();
return;
}
private_key = PEM_read_RSAPrivateKey(fp, nullptr, nullptr, nullptr);
fclose(fp);
}
void generateRSAKeys()
{
LOG_INFO(log, "Generating new RSA key.");
RSA * rsa = RSA_new();
if (rsa == nullptr)
{
throw Exception("Failed to allocate RSA key.", 1002);
}
BIGNUM * e = BN_new();
if (!e)
{
RSA_free(rsa);
throw Exception("Failed to allocate BIGNUM.", 1002);
}
if (!BN_set_word(e, 65537) || !RSA_generate_key_ex(rsa, 2048, e, nullptr))
{
RSA_free(rsa);
BN_free(e);
throw Exception("Failed to generate RSA key.", 1002);
}
BN_free(e);
public_key = rsa;
private_key = RSAPrivateKey_dup(rsa);
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override
{
LOG_TRACE(log, "MySQL connection. Address: " << socket.peerAddress().toString());
return new MySQLHandler(server, socket, public_key, private_key);
}
~MySQLHandlerFactory() override
{
RSA_free(public_key);
RSA_free(private_key);
}
};
}

View File

@ -49,6 +49,7 @@
#include <Common/StatusFile.h>
#include "TCPHandlerFactory.h"
#include "Common/config_version.h"
#include "MySQLHandlerFactory.h"
#if defined(__linux__)
#include <Common/hasLinuxCapability.h>
@ -668,7 +669,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening tcp: " + address.toString());
LOG_INFO(log, "Listening for connections with native protocol (tcp): " + address.toString());
}
/// TCP with SSL
@ -685,7 +686,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening tcp_secure: " + address.toString());
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): " + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
@ -710,7 +711,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket,
http_params));
LOG_INFO(log, "Listening interserver http: " + address.toString());
LOG_INFO(log, "Listening for replica communication (interserver) http://" + address.toString());
}
if (config().has("interserver_https_port"))
@ -727,12 +728,27 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket,
http_params));
LOG_INFO(log, "Listening interserver https: " + address.toString());
LOG_INFO(log, "Listening for secure replica communication (interserver) https://" + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
if (config().has("mysql_port"))
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, config().getInt("mysql_port"), /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
}
}
catch (const Poco::Exception & e)
{

View File

@ -436,6 +436,8 @@ namespace ErrorCodes
extern const int CONDITIONAL_TREE_PARENT_NOT_FOUND = 2001;
extern const int ILLEGAL_PROJECTION_MANIPULATOR = 2002;
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES = 446;
}
}

View File

@ -0,0 +1,94 @@
#include <IO/WriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <common/logger_useful.h>
#include <random>
#include <sstream>
#include "MySQLProtocol.h"
namespace DB
{
namespace MySQLProtocol
{
void PacketSender::resetSequenceId()
{
sequence_id = 0;
}
String PacketSender::packetToText(String payload)
{
String result;
for (auto c : payload)
{
result += ' ';
result += std::to_string(static_cast<unsigned char>(c));
}
return result;
}
uint64_t readLengthEncodedNumber(std::istringstream & ss)
{
char c;
uint64_t buf = 0;
ss.get(c);
auto cc = static_cast<uint8_t>(c);
if (cc < 0xfc)
{
return cc;
}
else if (cc < 0xfd)
{
ss.read(reinterpret_cast<char *>(&buf), 2);
}
else if (cc < 0xfe)
{
ss.read(reinterpret_cast<char *>(&buf), 3);
}
else
{
ss.read(reinterpret_cast<char *>(&buf), 8);
}
return buf;
}
std::string writeLengthEncodedNumber(uint64_t x)
{
std::string result;
if (x < 251)
{
result.append(1, static_cast<char>(x));
}
else if (x < (1 << 16))
{
result.append(1, 0xfc);
result.append(reinterpret_cast<char *>(&x), 2);
}
else if (x < (1 << 24))
{
result.append(1, 0xfd);
result.append(reinterpret_cast<char *>(&x), 3);
}
else
{
result.append(1, 0xfe);
result.append(reinterpret_cast<char *>(&x), 8);
}
return result;
}
void writeLengthEncodedString(std::string & payload, const std::string & s)
{
payload.append(writeLengthEncodedNumber(s.length()));
payload.append(s);
}
void writeNulTerminatedString(std::string & payload, const std::string & s)
{
payload.append(s);
payload.append(1, 0);
}
}
}

View File

@ -0,0 +1,669 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteBufferFromString.h>
#include <Core/Types.h>
#include <Poco/RandomStream.h>
#include <Poco/Net/StreamSocket.h>
#include <random>
#include <sstream>
#include <common/logger_useful.h>
#include <Poco/Logger.h>
/// Implementation of MySQL wire protocol
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_CLIENT;
}
namespace MySQLProtocol
{
const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb
const size_t SCRAMBLE_LENGTH = 20;
const size_t AUTH_PLUGIN_DATA_PART_1_LENGTH = 8;
const size_t MYSQL_ERRMSG_SIZE = 512;
const size_t PACKET_HEADER_SIZE = 4;
const size_t SSL_REQUEST_PAYLOAD_SIZE = 32;
namespace Authentication
{
const String SHA256 = "sha256_password"; /// Caching SHA2 plugin is not used because it would be possible to authenticate knowing hash from users.xml.
}
enum CharacterSet
{
utf8_general_ci = 33,
binary = 63
};
enum StatusFlags
{
SERVER_SESSION_STATE_CHANGED = 0x4000
};
enum Capability
{
CLIENT_CONNECT_WITH_DB = 0x00000008,
CLIENT_PROTOCOL_41 = 0x00000200,
CLIENT_SSL = 0x00000800,
CLIENT_TRANSACTIONS = 0x00002000, // TODO
CLIENT_SESSION_TRACK = 0x00800000, // TODO
CLIENT_SECURE_CONNECTION = 0x00008000,
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000,
CLIENT_PLUGIN_AUTH = 0x00080000,
CLIENT_DEPRECATE_EOF = 0x01000000,
};
enum Command
{
COM_SLEEP = 0x0,
COM_QUIT = 0x1,
COM_INIT_DB = 0x2,
COM_QUERY = 0x3,
COM_FIELD_LIST = 0x4,
COM_CREATE_DB = 0x5,
COM_DROP_DB = 0x6,
COM_REFRESH = 0x7,
COM_SHUTDOWN = 0x8,
COM_STATISTICS = 0x9,
COM_PROCESS_INFO = 0xa,
COM_CONNECT = 0xb,
COM_PROCESS_KILL = 0xc,
COM_DEBUG = 0xd,
COM_PING = 0xe,
COM_TIME = 0xf,
COM_DELAYED_INSERT = 0x10,
COM_CHANGE_USER = 0x11,
COM_RESET_CONNECTION = 0x1f,
COM_DAEMON = 0x1d
};
enum ColumnType
{
MYSQL_TYPE_DECIMAL = 0x00,
MYSQL_TYPE_TINY = 0x01,
MYSQL_TYPE_SHORT = 0x02,
MYSQL_TYPE_LONG = 0x03,
MYSQL_TYPE_FLOAT = 0x04,
MYSQL_TYPE_DOUBLE = 0x05,
MYSQL_TYPE_NULL = 0x06,
MYSQL_TYPE_TIMESTAMP = 0x07,
MYSQL_TYPE_LONGLONG = 0x08,
MYSQL_TYPE_INT24 = 0x09,
MYSQL_TYPE_DATE = 0x0a,
MYSQL_TYPE_TIME = 0x0b,
MYSQL_TYPE_DATETIME = 0x0c,
MYSQL_TYPE_YEAR = 0x0d,
MYSQL_TYPE_VARCHAR = 0x0f,
MYSQL_TYPE_BIT = 0x10,
MYSQL_TYPE_NEWDECIMAL = 0xf6,
MYSQL_TYPE_ENUM = 0xf7,
MYSQL_TYPE_SET = 0xf8,
MYSQL_TYPE_TINY_BLOB = 0xf9,
MYSQL_TYPE_MEDIUM_BLOB = 0xfa,
MYSQL_TYPE_LONG_BLOB = 0xfb,
MYSQL_TYPE_BLOB = 0xfc,
MYSQL_TYPE_VAR_STRING = 0xfd,
MYSQL_TYPE_STRING = 0xfe,
MYSQL_TYPE_GEOMETRY = 0xff
};
class ProtocolError : public DB::Exception
{
public:
using Exception::Exception;
};
class WritePacket
{
public:
virtual String getPayload() const = 0;
virtual ~WritePacket() = default;
};
class ReadPacket
{
public:
ReadPacket() = default;
ReadPacket(const ReadPacket &) = default;
virtual void readPayload(String payload) = 0;
virtual ~ReadPacket() = default;
};
/* Writes and reads packets, keeping sequence-id.
* Throws ProtocolError, if packet with incorrect sequence-id was received.
*/
class PacketSender
{
public:
size_t & sequence_id;
ReadBuffer * in;
WriteBuffer * out;
size_t max_packet_size = MAX_PACKET_LENGTH;
/// For reading and writing.
PacketSender(ReadBuffer & in, WriteBuffer & out, size_t & sequence_id, const String logger_name)
: sequence_id(sequence_id)
, in(&in)
, out(&out)
, log(&Poco::Logger::get(logger_name))
{
log->setLevel("information");
}
/// For writing.
PacketSender(WriteBuffer & out, size_t & sequence_id, const String logger_name)
: sequence_id(sequence_id)
, in(nullptr)
, out(&out)
, log(&Poco::Logger::get(logger_name))
{
log->setLevel("information");
}
String receivePacketPayload()
{
WriteBufferFromOwnString buf;
size_t payload_length = 0;
size_t packet_sequence_id = 0;
// packets which are larger than or equal to 16MB are splitted
do
{
LOG_TRACE(log, "Reading from buffer");
in->readStrict(reinterpret_cast<char *>(&payload_length), 3);
if (payload_length > max_packet_size)
{
std::ostringstream tmp;
tmp << "Received packet with payload larger than max_packet_size: " << payload_length;
throw ProtocolError(tmp.str(), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
in->readStrict(reinterpret_cast<char *>(&packet_sequence_id), 1);
if (packet_sequence_id != sequence_id)
{
std::ostringstream tmp;
tmp << "Received packet with wrong sequence-id: " << packet_sequence_id << ". Expected: " << sequence_id << '.';
throw ProtocolError(tmp.str(), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
sequence_id++;
LOG_TRACE(log, "Received packet. Sequence-id: " << packet_sequence_id << ", payload length: " << payload_length);
copyData(*in, static_cast<WriteBuffer &>(buf), payload_length);
} while (payload_length == max_packet_size);
return std::move(buf.str());
}
void receivePacket(ReadPacket & packet)
{
packet.readPayload(receivePacketPayload());
}
template<class T>
void sendPacket(const T & packet, bool flush = false)
{
static_assert(std::is_base_of<WritePacket, T>());
String payload = packet.getPayload();
size_t pos = 0;
do
{
size_t payload_length = std::min(payload.length() - pos, max_packet_size);
LOG_TRACE(log, "Writing packet of size " << payload_length << " with sequence-id " << static_cast<int>(sequence_id));
LOG_TRACE(log, packetToText(payload));
out->write(reinterpret_cast<const char *>(&payload_length), 3);
out->write(reinterpret_cast<const char *>(&sequence_id), 1);
out->write(payload.data() + pos, payload_length);
pos += payload_length;
sequence_id++;
} while (pos < payload.length());
LOG_TRACE(log, "Packet was sent.");
if (flush)
{
out->next();
}
}
/// Sets sequence-id to 0. Must be called before each command phase.
void resetSequenceId();
private:
/// Converts packet to text. Is used for debug output.
static String packetToText(String payload);
Poco::Logger * log;
};
uint64_t readLengthEncodedNumber(std::istringstream & ss);
String writeLengthEncodedNumber(uint64_t x);
void writeLengthEncodedString(String & payload, const String & s);
void writeNulTerminatedString(String & payload, const String & s);
class Handshake : public WritePacket
{
int protocol_version = 0xa;
String server_version;
uint32_t connection_id;
uint32_t capability_flags;
uint8_t character_set;
uint32_t status_flags;
String auth_plugin_data;
public:
explicit Handshake(uint32_t connection_id, String server_version, String auth_plugin_data)
: protocol_version(0xa)
, server_version(std::move(server_version))
, connection_id(connection_id)
, capability_flags(CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
| CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF | CLIENT_SSL)
, character_set(CharacterSet::utf8_general_ci)
, status_flags(0)
, auth_plugin_data(auth_plugin_data)
{
}
String getPayload() const override
{
String result;
result.append(1, protocol_version);
writeNulTerminatedString(result, server_version);
result.append(reinterpret_cast<const char *>(&connection_id), 4);
writeNulTerminatedString(result, auth_plugin_data.substr(0, AUTH_PLUGIN_DATA_PART_1_LENGTH));
result.append(reinterpret_cast<const char *>(&capability_flags), 2);
result.append(reinterpret_cast<const char *>(&character_set), 1);
result.append(reinterpret_cast<const char *>(&status_flags), 2);
result.append((reinterpret_cast<const char *>(&capability_flags)) + 2, 2);
result.append(1, auth_plugin_data.size());
result.append(10, 0x0);
result.append(auth_plugin_data.substr(AUTH_PLUGIN_DATA_PART_1_LENGTH, auth_plugin_data.size() - AUTH_PLUGIN_DATA_PART_1_LENGTH));
result.append(Authentication::SHA256);
result.append(1, 0x0);
return result;
}
};
class SSLRequest : public ReadPacket
{
public:
uint32_t capability_flags;
uint32_t max_packet_size;
uint8_t character_set;
void readPayload(String s) override
{
std::istringstream ss(s);
ss.readsome(reinterpret_cast<char *>(&capability_flags), 4);
ss.readsome(reinterpret_cast<char *>(&max_packet_size), 4);
ss.readsome(reinterpret_cast<char *>(&character_set), 1);
}
};
class HandshakeResponse : public ReadPacket
{
public:
uint32_t capability_flags;
uint32_t max_packet_size;
uint8_t character_set;
String username;
String auth_response;
String database;
String auth_plugin_name;
HandshakeResponse() = default;
HandshakeResponse(const HandshakeResponse &) = default;
void readPayload(String s) override
{
std::istringstream ss(s);
ss.readsome(reinterpret_cast<char *>(&capability_flags), 4);
ss.readsome(reinterpret_cast<char *>(&max_packet_size), 4);
ss.readsome(reinterpret_cast<char *>(&character_set), 1);
ss.ignore(23);
std::getline(ss, username, static_cast<char>(0x0));
if (capability_flags & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA)
{
auto len = readLengthEncodedNumber(ss);
auth_response.resize(len);
ss.read(auth_response.data(), static_cast<std::streamsize>(len));
}
else if (capability_flags & CLIENT_SECURE_CONNECTION)
{
uint8_t len;
ss.read(reinterpret_cast<char *>(&len), 1);
auth_response.resize(len);
ss.read(auth_response.data(), len);
}
else
{
std::getline(ss, auth_response, static_cast<char>(0x0));
}
if (capability_flags & CLIENT_CONNECT_WITH_DB)
{
std::getline(ss, database, static_cast<char>(0x0));
}
if (capability_flags & CLIENT_PLUGIN_AUTH)
{
std::getline(ss, auth_plugin_name, static_cast<char>(0x0));
}
}
};
class AuthSwitchRequest : public WritePacket
{
String plugin_name;
String auth_plugin_data;
public:
AuthSwitchRequest(String plugin_name, String auth_plugin_data)
: plugin_name(std::move(plugin_name)), auth_plugin_data(std::move(auth_plugin_data))
{
}
String getPayload() const override
{
String result;
result.append(1, 0xfe);
writeNulTerminatedString(result, plugin_name);
result.append(auth_plugin_data);
return result;
}
};
class AuthSwitchResponse : public ReadPacket
{
public:
String value;
void readPayload(String s) override
{
value = std::move(s);
}
};
class AuthMoreData : public WritePacket
{
String data;
public:
AuthMoreData(String data): data(std::move(data)) {}
String getPayload() const override
{
String result;
result.append(1, 0x01);
result.append(data);
return result;
}
};
/// Packet with a single null-terminated string. Is used for clear text authentication.
class NullTerminatedString : public ReadPacket
{
public:
String value;
void readPayload(String s) override
{
if (s.length() == 0 || s.back() != 0)
{
throw ProtocolError("String is not null terminated.", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
value = s;
value.pop_back();
}
};
class OK_Packet : public WritePacket
{
uint8_t header;
uint32_t capabilities;
uint64_t affected_rows;
int16_t warnings = 0;
uint32_t status_flags;
String session_state_changes;
String info;
public:
OK_Packet(uint8_t header,
uint32_t capabilities,
uint64_t affected_rows,
uint32_t status_flags,
int16_t warnings,
String session_state_changes = "",
String info = "")
: header(header)
, capabilities(capabilities)
, affected_rows(affected_rows)
, warnings(warnings)
, status_flags(status_flags)
, session_state_changes(std::move(session_state_changes))
, info(info)
{
}
String getPayload() const override
{
String result;
result.append(1, header);
result.append(writeLengthEncodedNumber(affected_rows));
result.append(writeLengthEncodedNumber(0)); /// last insert-id
if (capabilities & CLIENT_PROTOCOL_41)
{
result.append(reinterpret_cast<const char *>(&status_flags), 2);
result.append(reinterpret_cast<const char *>(&warnings), 2);
}
else if (capabilities & CLIENT_TRANSACTIONS)
{
result.append(reinterpret_cast<const char *>(&status_flags), 2);
}
if (capabilities & CLIENT_SESSION_TRACK)
{
result.append(writeLengthEncodedNumber(info.length()));
result.append(info);
if (status_flags & SERVER_SESSION_STATE_CHANGED)
{
result.append(writeLengthEncodedNumber(session_state_changes.length()));
result.append(session_state_changes);
}
}
else
{
result.append(info);
}
return result;
}
};
class EOF_Packet : public WritePacket
{
int warnings;
int status_flags;
public:
EOF_Packet(int warnings, int status_flags) : warnings(warnings), status_flags(status_flags)
{}
String getPayload() const override
{
String result;
result.append(1, 0xfe); // EOF header
result.append(reinterpret_cast<const char *>(&warnings), 2);
result.append(reinterpret_cast<const char *>(&status_flags), 2);
return result;
}
};
class ERR_Packet : public WritePacket
{
int error_code;
String sql_state;
String error_message;
public:
ERR_Packet(int error_code, String sql_state, String error_message)
: error_code(error_code), sql_state(std::move(sql_state)), error_message(std::move(error_message))
{
}
String getPayload() const override
{
String result;
result.append(1, 0xff);
result.append(reinterpret_cast<const char *>(&error_code), 2);
result.append("#", 1);
result.append(sql_state.data(), sql_state.length());
result.append(error_message.data(), std::min(error_message.length(), MYSQL_ERRMSG_SIZE));
return result;
}
};
class ColumnDefinition : public WritePacket
{
String schema;
String table;
String org_table;
String name;
String org_name;
size_t next_length = 0x0c;
uint16_t character_set;
uint32_t column_length;
ColumnType column_type;
uint16_t flags;
uint8_t decimals = 0x00;
public:
ColumnDefinition(
String schema,
String table,
String org_table,
String name,
String org_name,
uint16_t character_set,
uint32_t column_length,
ColumnType column_type,
uint16_t flags,
uint8_t decimals)
: schema(std::move(schema)), table(std::move(table)), org_table(std::move(org_table)), name(std::move(name)),
org_name(std::move(org_name)), character_set(character_set), column_length(column_length), column_type(column_type), flags(flags),
decimals(decimals)
{
}
/// Should be used when column metadata (original name, table, original table, database) is unknown.
ColumnDefinition(
String name,
uint16_t character_set,
uint32_t column_length,
ColumnType column_type,
uint16_t flags,
uint8_t decimals)
: ColumnDefinition("", "", "", std::move(name), "", character_set, column_length, column_type, flags, decimals)
{
}
String getPayload() const override
{
String result;
writeLengthEncodedString(result, "def"); /// always "def"
writeLengthEncodedString(result, ""); /// schema
writeLengthEncodedString(result, ""); /// table
writeLengthEncodedString(result, ""); /// org_table
writeLengthEncodedString(result, name);
writeLengthEncodedString(result, ""); /// org_name
result.append(writeLengthEncodedNumber(next_length));
result.append(reinterpret_cast<const char *>(&character_set), 2);
result.append(reinterpret_cast<const char *>(&column_length), 4);
result.append(reinterpret_cast<const char *>(&column_type), 1);
result.append(reinterpret_cast<const char *>(&flags), 2);
result.append(reinterpret_cast<const char *>(&decimals), 2);
result.append(2, 0x0);
return result;
}
};
class ComFieldList : public ReadPacket
{
public:
String table, field_wildcard;
void readPayload(String payload)
{
std::istringstream ss(payload);
ss.ignore(1); // command byte
std::getline(ss, table, static_cast<char>(0x0));
field_wildcard = payload.substr(table.length() + 2); // rest of the packet
}
};
class LengthEncodedNumber : public WritePacket
{
uint64_t value;
public:
LengthEncodedNumber(uint64_t value): value(value)
{
}
String getPayload() const override
{
return writeLengthEncodedNumber(value);
}
};
class ResultsetRow : public WritePacket
{
std::vector<String> columns;
public:
ResultsetRow()
{
}
void appendColumn(String value)
{
columns.emplace_back(std::move(value));
}
String getPayload() const override
{
String result;
for (const String & column : columns)
{
writeLengthEncodedString(result, column);
}
return result;
}
};
}
}

View File

@ -0,0 +1,87 @@
#include "MySQLBlockOutputStream.h"
#include <Core/MySQLProtocol.h>
#include <Interpreters/ProcessList.h>
#include <iomanip>
#include <sstream>
namespace DB
{
using namespace MySQLProtocol;
MySQLBlockOutputStream::MySQLBlockOutputStream(WriteBuffer & buf, const Block & header, Context & context)
: header(header)
, context(context)
, packet_sender(new PacketSender(buf, context.sequence_id, "MySQLBlockOutputStream"))
{
packet_sender->max_packet_size = context.max_packet_size;
}
void MySQLBlockOutputStream::writePrefix()
{
if (header.columns() == 0)
return;
packet_sender->sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName())
{
ColumnDefinition column_definition(column.name, CharacterSet::binary, std::numeric_limits<uint32_t>::max(),
ColumnType::MYSQL_TYPE_STRING, 0, 0);
packet_sender->sendPacket(column_definition);
}
if (!(context.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_sender->sendPacket(EOF_Packet(0, 0));
}
}
void MySQLBlockOutputStream::write(const Block & block)
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; i++)
{
ResultsetRow row_packet;
for (const ColumnWithTypeAndName & column : block)
{
String column_value;
WriteBufferFromString ostr(column_value);
column.type->serializeAsText(*column.column.get(), i, ostr, format_settings);
ostr.finish();
row_packet.appendColumn(std::move(column_value));
}
packet_sender->sendPacket(row_packet);
}
}
void MySQLBlockOutputStream::writeSuffix()
{
QueryStatus * process_list_elem = context.getProcessListElement();
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
size_t affected_rows = info.written_rows;
std::stringstream human_readable_info;
human_readable_info << std::fixed << std::setprecision(3)
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
if (header.columns() == 0)
packet_sender->sendPacket(OK_Packet(0x0, context.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
if (context.client_capabilities & CLIENT_DEPRECATE_EOF)
packet_sender->sendPacket(OK_Packet(0xfe, context.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
packet_sender->sendPacket(EOF_Packet(0, 0), true);
}
void MySQLBlockOutputStream::flush()
{
packet_sender->out->next();
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include "IBlockOutputStream.h"
#include <Core/MySQLProtocol.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
namespace DB
{
/** Interface for writing rows in MySQL Client/Server Protocol format.
*/
class MySQLBlockOutputStream : public IBlockOutputStream
{
public:
MySQLBlockOutputStream(WriteBuffer & buf, const Block & header, Context & context);
Block getHeader() const { return header; }
void write(const Block & block);
void writePrefix();
void writeSuffix();
void flush();
private:
Block header;
Context & context;
std::shared_ptr<MySQLProtocol::PacketSender> packet_sender;
FormatSettings format_settings;
};
using MySQLBlockOutputStreamPtr = std::shared_ptr<MySQLBlockOutputStream>;
}

View File

@ -130,6 +130,7 @@ void registerOutputFormatXML(FormatFactory & factory);
void registerOutputFormatODBCDriver(FormatFactory & factory);
void registerOutputFormatODBCDriver2(FormatFactory & factory);
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatMySQL(FormatFactory & factory);
/// Input only formats.
@ -168,6 +169,7 @@ FormatFactory::FormatFactory()
registerOutputFormatODBCDriver(*this);
registerOutputFormatODBCDriver2(*this);
registerOutputFormatNull(*this);
registerOutputFormatMySQL(*this);
}
}

View File

@ -0,0 +1,19 @@
#include <DataStreams/MySQLBlockOutputStream.h>
namespace DB
{
void registerOutputFormatMySQL(FormatFactory & factory)
{
factory.registerOutputFormat("MySQL", [](
WriteBuffer & buf,
const Block & sample,
const Context & context,
const FormatSettings &)
{
return std::make_shared<MySQLBlockOutputStream>(buf, sample, const_cast<Context &>(context));
});
}
}

View File

@ -479,6 +479,10 @@ public:
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;
/// MySQL wire protocol state.
size_t sequence_id = 0;
uint32_t client_capabilities = 0;
size_t max_packet_size = 0;
private:
/** Check if the current client has access to the specified database.
* If access is denied, throw an exception.

View File

@ -0,0 +1,23 @@
Columns:
a
Column types:
a BINARY
Result:
0
1
Columns:
name
a
Column types:
name BINARY
a BINARY
Result:
tables 1
Columns:
a
b
Column types:
a BINARY
b BINARY
Result:
тест 1

View File

@ -0,0 +1,7 @@
FROM golang:1.12.2
RUN go get "github.com/go-sql-driver/mysql"
COPY ./main.go main.go
RUN go build main.go

View File

@ -0,0 +1,8 @@
version: '2.2'
services:
golang1:
build:
context: ./
network: host
# to keep container running
command: sleep infinity

View File

@ -0,0 +1,92 @@
package main
import (
"database/sql"
"flag"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
)
func main() {
host := flag.String("host", "localhost", "mysql server address")
port := flag.Uint("port", 3306, "mysql server port")
user := flag.String("user", "", "username")
password := flag.String("password", "", "password")
database := flag.String("database", "", "database to authenticate against")
flag.Parse()
logger := log.New(os.Stderr, "", 0)
dataSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", *user, *password, *host, *port, *database)
db, err := sql.Open("mysql", dataSource)
if err != nil {
logger.Fatal(err)
}
defer db.Close()
runQuery := func(query string, processRows func(*sql.Rows) error) {
rows, err := db.Query(query)
if err != nil {
logger.Fatal(err)
}
columns, err := rows.Columns()
fmt.Println("Columns:")
for _, name := range columns {
fmt.Println(name)
}
columnsTypes, err := rows.ColumnTypes()
fmt.Println("Column types:")
for _, column := range columnsTypes {
fmt.Printf("%s %s\n", column.Name(), column.DatabaseTypeName())
}
fmt.Println("Result:")
err = processRows(rows)
if err != nil {
logger.Fatal(err)
}
err = rows.Close()
if err != nil {
logger.Fatal(err)
}
err = rows.Close()
if err != nil {
logger.Fatal(err)
}
}
processRows := func(rows *sql.Rows) error {
var x int
for rows.Next() {
err := rows.Scan(&x)
if err != nil {
return err
}
fmt.Println(x)
}
return rows.Err()
}
runQuery("select number as a from system.numbers limit 2", processRows)
processRows = func(rows *sql.Rows) error {
var name string
var a int
for rows.Next() {
err := rows.Scan(&name, &a)
if err != nil {
return err
}
fmt.Println(name, a)
}
return rows.Err()
}
runQuery("select name, 1 as a from system.tables where name == 'tables'", processRows)
runQuery("select 'тест' as a, 1 as b", processRows)
}

View File

@ -0,0 +1,6 @@
version: '2.2'
services:
mysql1:
image: mysql:5.7
# rewriting default command, because starting server is unnecessary
command: sleep infinity

View File

@ -0,0 +1,36 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
<!-- openssl req -subj "/CN=localhost" -new -newkey rsa:2048 -days 365 -nodes -x509 -keyout /etc/clickhouse-server/server.key -out /etc/clickhouse-server/server.crt -->
<certificateFile>/etc/clickhouse-server/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/server.key</privateKeyFile>
<!-- openssl dhparam -out /etc/clickhouse-server/dhparam.pem 4096 -->
<dhParamsFile>/etc/clickhouse-server/dhparam.pem</dhParamsFile>
<verificationMode>none</verificationMode>
<loadDefaultCAFile>true</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<tcp_port>9000</tcp_port>
<mysql_port>9001</mysql_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,8 @@
-----BEGIN DH PARAMETERS-----
MIIBCAKCAQEAkPGhfLY5nppeQkFBKYRpiisxzrRQfyyTUu6aabZP2CbAMAuoYzaC
Z+iqeWSQZKRYeA21SZXkC9xE1e5FJsc5IWzCRiMNZeLuj4ApUNysMu89DpX8/b91
+Ka6wRJnaO43ZqHj/9FpU4JiYtxoIpXDC9HeiSAnwLwJc3L+nkYfnSGgvzWIxhGV
gCoVmVBoTe7wrqCyVlM5nrNZSjhlSugvXmu2bSK3MwYF08QLKvlF68eedbs0PMWh
WC0bFM/X7gMBEqL4DiINufAShbZPKxD6eL2APiHPUo6xun3ed/Po/5j8QBmiku0c
5Jb12ZhOTRTQjaRg2aFF8LPdW2tDE7HmewIBAg==
-----END DH PARAMETERS-----

View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+zCCAeOgAwIBAgIJAIhI9ozZJ+TWMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDAeFw0xOTA0MjIwNDMyNTJaFw0yMDA0MjEwNDMyNTJaMBQx
EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBAK+wVUEdqF2uXvN0MJBgnAHyXi6JTi4p/F6igsrCjSNjJWzHH0vQmK8ujfcF
CkifW88i+W5eHctuEtQqNHK+t9x9YiZtXrj6m/XkOXs20mYgENSmbbbHbriTPnZB
zZrq6UqMlwIHNNAa+I3NMORQxVRaI0ybXnGVO5elr70xHpk03xL0JWKHpEqYp4db
2aBQgF6y3Ww4khxjIYqpUYXWXGFnVIRU7FKVEAM1xyKqvQzXjQ5sVM/wyHknveEF
3b/X4ggN+KNl5KOc0cWDh1/XaatJAPaUUPqZcq76tynLbP64Xm3dxHcj+gtRkO67
ef6MSg6l63m3XQP6Qb+MIkd06OsCAwEAAaNQME4wHQYDVR0OBBYEFDmODTO8QLDN
ykR3x0LIOnjNhrKhMB8GA1UdIwQYMBaAFDmODTO8QLDNykR3x0LIOnjNhrKhMAwG
A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAwaiJc7uqEpnH3aukbftDwX
m8GfEnj1HVdgg+9GGNq+9rvUYBF6gdPmjRCX9dO0cclLFx8jc2org0rTSq9WoOhX
E6qL4Eqrmc5SE3Y9jZM0h6GRD4oXK014FmtZ3T6ddZU3dQLj3BS2r1XrvmubTvGN
ZuTJNY8nx8Hh6H5XINmsEjUF9E5hog+PwCE03xt2adIdYL+gsbxASeNYyeUFpZv5
zcXR3VoakBWnAaOVgCHq2qh96QAnL7ZKzFkGf/MdwV10KU3dmb+ICbQUUdf9Gc17
aaDCIRws312F433FdXBkGs2UkB7ZZme9dfn6O1QbeTNvex2VLMqYx/CTkfFbOQA=
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvsFVBHahdrl7z
dDCQYJwB8l4uiU4uKfxeooLKwo0jYyVsxx9L0JivLo33BQpIn1vPIvluXh3LbhLU
KjRyvrfcfWImbV64+pv15Dl7NtJmIBDUpm22x264kz52Qc2a6ulKjJcCBzTQGviN
zTDkUMVUWiNMm15xlTuXpa+9MR6ZNN8S9CVih6RKmKeHW9mgUIBest1sOJIcYyGK
qVGF1lxhZ1SEVOxSlRADNcciqr0M140ObFTP8Mh5J73hBd2/1+IIDfijZeSjnNHF
g4df12mrSQD2lFD6mXKu+rcpy2z+uF5t3cR3I/oLUZDuu3n+jEoOpet5t10D+kG/
jCJHdOjrAgMBAAECggEARF66zrxb6RkSmmt8+rKeA6PuQu3sHsr4C1vyyjUr97l9
tvdGlpp20LWtSZQMjHZ3pARYTTsTHTeY3DgQcRcHNicVKx8k3ZepWeeW9vw+pL+V
zSt3RsoVrH6gsCSrfr4sS3aqzX9AbjwQvh48CJ3mLQ1m70kHV+xbZIh1+4pB/hyP
1wKyUE18ZkOptXvO/TtoHzLQCecpkXtWzmry1Eh2isvXA+NMrAtLibGsyM1mtm7i
5ozevzHabvvCDBEe+KgZdONgVhhhvm2eOd+/s4w3rw4ETud4fI/ZAJyWXhiIKFnA
VJbElWruSAoVBW7p2bsF5PbmVzvo8vXL+VylxYD+AQKBgQDhLoRKTVhNkn/QjKxq
sdOh+QZra0LzjVpAmkQzu7wZMSHEz9qePQciDQQrYKrmRF1vNcIRCVUTqWYheJ/1
lKRrCGa0ab6k96zkWMqLHD5u+UeJV7r1dJIx08ME9kNJ+x/XtB8klRIji16NiQUS
qc6p8z0M2AnbJzsRfWZRH8FeYwKBgQDHu8dzdtVGI7MtxfPOE/bfajiopDg8BdTC
pdug2T8XofRHRq7Q+0vYjTAZFT/slib91Pk6VvvPdo9VBZiL4omv4dAq6mOOdX/c
U14mJe1X5GCrr8ExZ8BfNJ3t/6sV1fcxyJwAw7iBguqxA2JqdM/wFk10K8XqvzVn
CD6O9yGt2QKBgFX1BMi8N538809vs41S7l9hCQNOQZNo/O+2M5yv6ECRkbtoQKKw
1x03bMUGNJaLuELweXE5Z8GGo5bZTe5X3F+DKHlr+DtO1C+ieUaa9HY2MAmMdLCn
2/qrREGLo+oEs4YKmuzC/taUp/ZNPKOAMISNdluFyFVg51pozPrgrVbTAoGBAKkE
LBl3O67o0t0vH8sJdeVFG8EJhlS0koBMnfgVHqC++dm+5HwPyvTrNQJkyv1HaqNt
r6FArkG3ED9gRuBIyT6+lctbIPgSUip9mbQqcBfqOCvQxGksZMur2ODncz09HLtS
CUFUXjOqNzOnq4ZuZu/Bz7U4vXiSaXxQq6+LTUKxAoGAFZU/qrI06XxnrE9A1X0W
l7DSkpZaDcu11NrZ473yONih/xOZNh4SSBpX8a7F6Pmh9BdtGqphML8NFPvQKcfP
b9H2iid2tc292uyrUEb5uTMmv61zoTwtitqLzO0+tS6PT3fXobX+eyeEWKzPBljL
HFtxG5CCXpkdnWRmaJnhTzA=
-----END PRIVATE KEY-----

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password>123</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,135 @@
# coding: utf-8
import os
import docker
import pytest
import subprocess
import pymysql.connections
from docker.models.containers import Container
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', config_dir=config_dir)
server_port = 9001
@pytest.fixture(scope="module")
def server_address():
cluster.start()
try:
yield cluster.get_instance_ip('node')
finally:
cluster.shutdown()
@pytest.fixture(scope='module')
def mysql_client():
docker_compose = os.path.join(SCRIPT_DIR, 'clients', 'mysql', 'docker_compose.yml')
subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d', '--build'])
yield docker.from_env().containers.get(cluster.project_name + '_mysql1_1')
@pytest.fixture(scope='module')
def golang_container():
docker_compose = os.path.join(SCRIPT_DIR, 'clients', 'golang', 'docker_compose.yml')
subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d', '--build'])
yield docker.from_env().containers.get(cluster.project_name + '_golang1_1')
def test_mysql_client(mysql_client, server_address):
# type: (Container, str) -> None
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default --password=123
-e "SELECT 1 as a;"
-e "SELECT 'тест' as b;"
'''.format(host=server_address, port=server_port), demux=True)
assert stdout == 'a\n1\nb\nтест\n'
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default --password=abc -e "select 1 as a;"
'''.format(host=server_address, port=server_port), demux=True)
assert stderr == 'mysql: [Warning] Using a password on the command line interface can be insecure.\n' \
'ERROR 193 (00000): Wrong password for user default\n'
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default --password=123
-e "use system;"
-e "select count(*) from (select name from tables limit 1);"
-e "use system2;"
'''.format(host=server_address, port=server_port), demux=True)
assert stdout == 'count()\n1\n'
assert stderr == "mysql: [Warning] Using a password on the command line interface can be insecure.\n" \
"ERROR 81 (00000) at line 1: Database system2 doesn't exist\n"
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default --password=123
-e "CREATE DATABASE x;"
-e "USE x;"
-e "CREATE TABLE table1 (a UInt32) ENGINE = Memory;"
-e "INSERT INTO table1 VALUES (0), (1), (5);"
-e "INSERT INTO table1 VALUES (0), (1), (5);"
-e "SELECT * FROM table1 ORDER BY a;"
'''.format(host=server_address, port=server_port), demux=True)
assert stdout == 'a\n0\n0\n1\n1\n5\n5\n'
def test_python_client(server_address):
with pytest.raises(pymysql.InternalError) as exc_info:
pymysql.connections.Connection(host=server_address, user='default', password='abacab', database='default', port=server_port)
assert exc_info.value.args == (193, 'Wrong password for user default')
client = pymysql.connections.Connection(host=server_address, user='default', password='123', database='default', port=server_port)
with pytest.raises(pymysql.InternalError) as exc_info:
client.query('select name from tables')
assert exc_info.value.args == (60, "Table default.tables doesn't exist.")
cursor = client.cursor(pymysql.cursors.DictCursor)
cursor.execute("select 1 as a, 'тест' as b")
assert cursor.fetchall() == [{'a': '1', 'b': 'тест'}]
client.select_db('system')
with pytest.raises(pymysql.InternalError) as exc_info:
client.select_db('system2')
assert exc_info.value.args == (81, "Database system2 doesn't exist")
client.select_db('x')
cursor = client.cursor(pymysql.cursors.DictCursor)
cursor.execute("TRUNCATE TABLE table1")
cursor.execute("INSERT INTO table1 VALUES (1), (3)")
cursor.execute("INSERT INTO table1 VALUES (1), (4)")
cursor.execute("SELECT * FROM table1 ORDER BY a")
assert cursor.fetchall() == [{'a': '1'}, {'a': '1'}, {'a': '3'}, {'a': '4'}]
def test_golang_client(server_address, golang_container):
# type: (str, Container) -> None
code, (stdout, stderr) = golang_container.exec_run('./main --host {host} --port {port} --user default --password 123 --database '
'abc'.format(host=server_address, port=server_port), demux=True)
assert code == 1
assert stderr == "Error 81: Database abc doesn't exist\n"
code, (stdout, stderr) = golang_container.exec_run('./main --host {host} --port {port} --user default --password 123 --database '
'default'.format(host=server_address, port=server_port), demux=True)
assert code == 0
with open(os.path.join(SCRIPT_DIR, 'clients', 'golang', '0.reference')) as fp:
reference = fp.read()
assert stdout == reference