Revert format changes

This commit is contained in:
slvrtrn 2023-09-04 21:15:26 +02:00
parent 2245a8df9d
commit bb0eff9669
9 changed files with 191 additions and 229 deletions

View File

@ -124,7 +124,8 @@ EOFPacket::EOFPacket() : warnings(0x00), status_flags(0x00)
{
}
EOFPacket::EOFPacket(int warnings_, int status_flags_) : warnings(warnings_), status_flags(status_flags_)
EOFPacket::EOFPacket(int warnings_, int status_flags_)
: warnings(warnings_), status_flags(status_flags_)
{
}
@ -196,7 +197,8 @@ void ERRPacket::writePayloadImpl(WriteBuffer & buffer) const
buffer.write(error_message.data(), std::min(error_message.length(), MYSQL_ERRMSG_SIZE));
}
ResponsePacket::ResponsePacket(UInt32 server_capability_flags_) : ok(OKPacket(server_capability_flags_))
ResponsePacket::ResponsePacket(UInt32 server_capability_flags_)
: ok(OKPacket(server_capability_flags_))
{
}

View File

@ -58,33 +58,17 @@ void ComFieldList::readPayloadImpl(ReadBuffer & payload)
readStringUntilEOF(field_wildcard, payload);
}
ColumnDefinition::ColumnDefinition() : character_set(0x00), column_length(0), column_type(MYSQL_TYPE_DECIMAL), flags(0x00)
ColumnDefinition::ColumnDefinition()
: character_set(0x00), column_length(0), column_type(MYSQL_TYPE_DECIMAL), flags(0x00)
{
}
ColumnDefinition::ColumnDefinition(
String schema_,
String table_,
String org_table_,
String name_,
String org_name_,
uint16_t character_set_,
uint32_t column_length_,
ColumnType column_type_,
uint16_t flags_,
uint8_t decimals_,
bool with_defaults_)
: schema(std::move(schema_))
, table(std::move(table_))
, org_table(std::move(org_table_))
, name(std::move(name_))
, org_name(std::move(org_name_))
, character_set(character_set_)
, column_length(column_length_)
, column_type(column_type_)
, flags(flags_)
, decimals(decimals_)
, is_comm_field_list_response(with_defaults_)
String schema_, String table_, String org_table_, String name_, String org_name_, uint16_t character_set_, uint32_t column_length_,
ColumnType column_type_, uint16_t flags_, uint8_t decimals_, bool with_defaults_)
: schema(std::move(schema_)), table(std::move(table_)), org_table(std::move(org_table_)), name(std::move(name_)),
org_name(std::move(org_name_)), character_set(character_set_), column_length(column_length_), column_type(column_type_),
flags(flags_), decimals(decimals_), is_comm_field_list_response(with_defaults_)
{
}
@ -96,9 +80,15 @@ ColumnDefinition::ColumnDefinition(
size_t ColumnDefinition::getPayloadSize() const
{
return 12 + getLengthEncodedStringSize("def") + getLengthEncodedStringSize(schema) + getLengthEncodedStringSize(table)
+ getLengthEncodedStringSize(org_table) + getLengthEncodedStringSize(name) + getLengthEncodedStringSize(org_name)
+ getLengthEncodedNumberSize(next_length) + is_comm_field_list_response;
return 12 +
getLengthEncodedStringSize("def") +
getLengthEncodedStringSize(schema) +
getLengthEncodedStringSize(table) +
getLengthEncodedStringSize(org_table) +
getLengthEncodedStringSize(name) +
getLengthEncodedStringSize(org_name) +
getLengthEncodedNumberSize(next_length) +
is_comm_field_list_response;
}
void ColumnDefinition::readPayloadImpl(ReadBuffer & payload)

View File

@ -1,13 +1,13 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/Serializations/SerializationDecimal.h>
#include <Common/typeid_cast.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/readDecimalText.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <type_traits>

View File

@ -28,14 +28,12 @@ void SerializationDate32::deserializeTextEscaped(IColumn & column, ReadBuffer &
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextEscaped(
const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
void SerializationDate32::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeText(column, row_num, ostr, settings);
}
void SerializationDate32::serializeTextQuoted(
const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
void SerializationDate32::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('\'', ostr);
serializeText(column, row_num, ostr, settings);
@ -51,8 +49,7 @@ void SerializationDate32::deserializeTextQuoted(IColumn & column, ReadBuffer & i
assert_cast<ColumnInt32 &>(column).getData().push_back(x); /// It's important to do this at the end - for exception safety.
}
void SerializationDate32::serializeTextJSON(
const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
void SerializationDate32::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);
@ -68,8 +65,7 @@ void SerializationDate32::deserializeTextJSON(IColumn & column, ReadBuffer & ist
assert_cast<ColumnInt32 &>(column).getData().push_back(x);
}
void SerializationDate32::serializeTextCSV(
const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
void SerializationDate32::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('"', ostr);
serializeText(column, row_num, ostr, settings);

View File

@ -1,7 +1,7 @@
#pragma once
#include <Core/Defines.h>
#include <Core/Names.h>
#include <Core/Defines.h>
#include <base/types.h>
#include <base/unit.h>

View File

@ -17,7 +17,8 @@ using namespace MySQLProtocol::ProtocolText;
using namespace MySQLProtocol::ProtocolBinary;
MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IOutputFormat(header_, out_), client_capabilities(settings_.mysql_wire.client_capabilities)
: IOutputFormat(header_, out_)
, client_capabilities(settings_.mysql_wire.client_capabilities)
{
/// MySQlWire is a special format that is usually used as output format for MySQL protocol connections.
/// In this case we have a correct `sequence_id` stored in `settings_.mysql_wire`.
@ -137,8 +138,9 @@ void registerOutputFormatMySQLWire(FormatFactory & factory)
{
factory.registerOutputFormat(
"MySQLWire",
[](WriteBuffer & buf, const Block & sample, const FormatSettings & settings)
{ return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
[](WriteBuffer & buf,
const Block & sample,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}
}

View File

@ -67,7 +67,10 @@ static String killConnectionIdReplacementQuery(const String & query);
static String selectLimitReplacementQuery(const String & query);
MySQLHandler::MySQLHandler(
IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, uint32_t connection_id_)
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool ssl_enabled, uint32_t connection_id_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
@ -75,8 +78,7 @@ MySQLHandler::MySQLHandler(
, connection_id(connection_id_)
, auth_plugin(new MySQLProtocol::Authentication::Native41())
{
server_capabilities = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
| CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
server_capabilities = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
if (ssl_enabled)
server_capabilities |= CLIENT_SSL;
@ -102,13 +104,8 @@ void MySQLHandler::run()
try
{
Handshake handshake(
server_capabilities,
connection_id,
VERSION_STRING + String("-") + VERSION_NAME,
auth_plugin->getName(),
auth_plugin->getAuthPluginData(),
CharacterSet::utf8_general_ci);
Handshake handshake(server_capabilities, connection_id, VERSION_STRING + String("-") + VERSION_NAME,
auth_plugin->getName(), auth_plugin->getAuthPluginData(), CharacterSet::utf8_general_ci);
packet_endpoint->sendPacket<Handshake>(handshake, true);
LOG_TRACE(log, "Sent handshake");
@ -118,10 +115,8 @@ void MySQLHandler::run()
client_capabilities = handshake_response.capability_flags;
max_packet_size = handshake_response.max_packet_size ? handshake_response.max_packet_size : MAX_PACKET_LENGTH;
LOG_TRACE(
log,
"Capabilities: {}, max_packet_size: {}, character_set: {}, user: {}, auth_response length: {}, database: {}, auth_plugin_name: "
"{}",
LOG_TRACE(log,
"Capabilities: {}, max_packet_size: {}, character_set: {}, user: {}, auth_response length: {}, database: {}, auth_plugin_name: {}",
handshake_response.capability_flags,
handshake_response.max_packet_size,
static_cast<int>(handshake_response.character_set),
@ -165,8 +160,8 @@ void MySQLHandler::run()
// For commands which are executed without MemoryTracker.
LimitReadBuffer limited_payload(payload, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
LOG_DEBUG(
log, "Received command: {}. Connection id: {}.", static_cast<int>(static_cast<unsigned char>(command)), connection_id);
LOG_DEBUG(log, "Received command: {}. Connection id: {}.",
static_cast<int>(static_cast<unsigned char>(command)), connection_id);
if (!tcp_server.isOpen())
return;
@ -232,15 +227,13 @@ void MySQLHandler::finishHandshake(MySQLProtocol::ConnectionPhase::HandshakeResp
size_t pos = 0;
/// Reads at least count and at most packet_size bytes.
auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void
{
auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void {
while (pos < count)
{
int ret = socket().receiveBytes(buf + pos, static_cast<uint32_t>(packet_size - pos));
if (ret == 0)
{
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data. Bytes read: {}. Bytes expected: 3", std::to_string(pos));
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data. Bytes read: {}. Bytes expected: 3", std::to_string(pos));
}
pos += ret;
}
@ -279,8 +272,7 @@ void MySQLHandler::authenticate(const String & user_name, const String & auth_pl
authPluginSSL();
}
std::optional<String> auth_response
= auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt;
std::optional<String> auth_response = auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt;
auth_plugin->authenticate(user_name, *session, auth_response, packet_endpoint, secure_connection, socket().peerAddress());
}
catch (const Exception & exc)
@ -312,17 +304,8 @@ void MySQLHandler::comFieldList(ReadBuffer & payload)
for (const NameAndTypePair & column : metadata_snapshot->getColumns().getAll())
{
ColumnDefinition column_definition(
database,
packet.table,
packet.table,
column.name,
column.name,
CharacterSet::binary,
100,
ColumnType::MYSQL_TYPE_STRING,
0,
0,
true);
database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0, true
);
packet_endpoint->sendPacket(column_definition);
}
packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, 0, 0, 0), true);
@ -369,8 +352,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool use_binary_protocol_resul
std::atomic<size_t> affected_rows {0};
auto prev = query_context->getProgressCallback();
query_context->setProgressCallback(
[&, my_prev = prev](const Progress & progress)
query_context->setProgressCallback([&, my_prev = prev](const Progress & progress)
{
if (my_prev)
my_prev(progress);
@ -475,17 +457,13 @@ void MySQLHandler::comStmtClose(ReadBuffer & payload)
void MySQLHandler::authPluginSSL()
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"ClickHouse was built without SSL support. Try specifying password using double SHA1 in users.xml.");
}
void MySQLHandler::finishHandshakeSSL(
[[maybe_unused]] size_t packet_size,
[[maybe_unused]] char * buf,
[[maybe_unused]] size_t pos,
[[maybe_unused]] std::function<void(size_t)> read_bytes,
[[maybe_unused]] MySQLProtocol::ConnectionPhase::HandshakeResponse & packet)
[[maybe_unused]] size_t packet_size, [[maybe_unused]] char * buf, [[maybe_unused]] size_t pos,
[[maybe_unused]] std::function<void(size_t)> read_bytes, [[maybe_unused]] MySQLProtocol::ConnectionPhase::HandshakeResponse & packet)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Client requested SSL, while it is disabled.");
}
@ -499,9 +477,10 @@ MySQLHandlerSSL::MySQLHandlerSSL(
uint32_t connection_id_,
RSA & public_key_,
RSA & private_key_)
: MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_), public_key(public_key_), private_key(private_key_)
{
}
: MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_)
, public_key(public_key_)
, private_key(private_key_)
{}
void MySQLHandlerSSL::authPluginSSL()
{
@ -509,10 +488,7 @@ void MySQLHandlerSSL::authPluginSSL()
}
void MySQLHandlerSSL::finishHandshakeSSL(
size_t packet_size,
char * buf,
size_t pos,
std::function<void(size_t)> read_bytes,
size_t packet_size, char *buf, size_t pos, std::function<void(size_t)> read_bytes,
MySQLProtocol::ConnectionPhase::HandshakeResponse & packet)
{
read_bytes(packet_size); /// Reading rest SSLRequest.
@ -542,8 +518,8 @@ static bool isFederatedServerSetupSetCommand(const String & query)
"|(^(SET AUTOCOMMIT(.*)))"
"|(^(SET sql_mode(.*)))"
"|(^(SET @@(.*)))"
"|(^(SET SESSION TRANSACTION ISOLATION LEVEL(.*)))",
std::regex::icase};
"|(^(SET SESSION TRANSACTION ISOLATION LEVEL(.*)))"
, std::regex::icase};
return 1 == std::regex_match(query, expr);
}

View File

@ -32,7 +32,11 @@ class MySQLHandler : public Poco::Net::TCPServerConnection
{
public:
MySQLHandler(
IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, uint32_t connection_id_);
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool ssl_enabled,
uint32_t connection_id_);
void run() final;
@ -59,12 +63,7 @@ protected:
void comStmtClose(ReadBuffer & payload);
virtual void authPluginSSL();
virtual void finishHandshakeSSL(
size_t packet_size,
char * buf,
size_t pos,
std::function<void(size_t)> read_bytes,
MySQLProtocol::ConnectionPhase::HandshakeResponse & packet);
virtual void finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function<void(size_t)> read_bytes, MySQLProtocol::ConnectionPhase::HandshakeResponse & packet);
IServer & server;
TCPServer & tcp_server;
@ -110,11 +109,8 @@ private:
void authPluginSSL() override;
void finishHandshakeSSL(
size_t packet_size,
char * buf,
size_t pos,
std::function<void(size_t)> read_bytes,
MySQLProtocol::ConnectionPhase::HandshakeResponse & packet) override;
size_t packet_size, char * buf, size_t pos,
std::function<void(size_t)> read_bytes, MySQLProtocol::ConnectionPhase::HandshakeResponse & packet) override;
RSA & public_key;
RSA & private_key;