Revert "revert protocol changes, found better way"

This reverts commit 3a918ae66a.
This commit is contained in:
zvonand 2023-03-07 16:05:23 +01:00
parent 1fd6e3f23b
commit 1ce697d8c0
11 changed files with 44 additions and 38 deletions

View File

@ -107,7 +107,6 @@ namespace ErrorCodes
extern const int UNRECOGNIZED_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_PARSE_DATETIME;
}
}
@ -1021,6 +1020,10 @@ bool ClientBase::receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_)
onProfileEvents(packet.block);
return true;
case Protocol::Server::TimezoneUpdate:
DateLUT::setDefaultTimezone(packet.server_timezone);
return true;
default:
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from server {}", packet.type, connection->getDescription());
@ -1185,6 +1188,10 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
columns_description = ColumnsDescription::parse(packet.multistring_message[1]);
return receiveSampleBlock(out, columns_description, parsed_query);
case Protocol::Server::TimezoneUpdate:
DateLUT::setDefaultTimezone(packet.server_timezone);
break;
default:
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected Data, Exception or Log, got {})",
@ -1530,6 +1537,10 @@ bool ClientBase::receiveEndOfQuery()
onProfileEvents(packet.block);
break;
case Protocol::Server::TimezoneUpdate:
DateLUT::setDefaultTimezone(packet.server_timezone);
break;
default:
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet from server (expected Exception, EndOfStream, Log, Progress or ProfileEvents. Got {})",
@ -1600,11 +1611,6 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
progress_indication.resetProgress();
profile_events.watch.restart();
/// A query may contain timezone setting. To handle this, old client-wide tz is saved here.
/// If timezone was set for a query, after its execution client tz will be back to old one.
/// If it was a settings query, new setting will be applied to client.
const std::string old_timezone = DateLUT::instance().getTimeZone();
{
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
@ -1653,19 +1659,6 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
bool is_async_insert = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
/// pre-load timezone from (query) settings -- new timezone may also be specified in query.
try
{
if (!global_context->getSettingsRef().timezone.toString().empty())
DateLUT::setDefaultTimezone(global_context->getSettingsRef().timezone);
}
catch (Poco::Exception &)
{
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME,
"Invalid time zone {} in client settings. Use `SET timezone = \'New/TZ\'` to set a proper timezone.",
global_context->getSettingsRef().timezone.toString());
}
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function) && !insert->watch && !is_async_insert)
{
@ -1700,18 +1693,6 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
query_parameters.insert_or_assign(name, value);
global_context->addQueryParameters(set_query->query_parameters);
try
{
if (!global_context->getSettingsRef().timezone.toString().empty())
DateLUT::setDefaultTimezone(global_context->getSettingsRef().timezone);
}
catch (Poco::Exception &)
{
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME,
"Invalid time zone {} in client settings. Use `SET timezone = \'New/TZ\'` to set a proper timezone.",
global_context->getSettingsRef().timezone.toString());
}
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
@ -1722,8 +1703,6 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
connection->setDefaultDatabase(new_database);
}
}
else
DateLUT::setDefaultTimezone(old_timezone);
/// Always print last block (if it was not printed already)
if (profile_events.last_block)

View File

@ -972,6 +972,11 @@ Packet Connection::receivePacket()
res.block = receiveProfileEvents();
return res;
case Protocol::Server::TimezoneUpdate:
readStringBinary(server_timezone, *in);
res.server_timezone = server_timezone;
return res;
default:
/// In unknown state, disconnect - to not leave unsynchronised connection.
disconnect();

View File

@ -38,6 +38,8 @@ struct Packet
ParallelReadRequest request;
ParallelReadResponse response;
std::string server_timezone;
Packet() : type(Protocol::Server::Hello) {}
};

View File

@ -258,6 +258,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
@ -339,6 +340,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:

View File

@ -158,6 +158,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t
fillWordsFromBlock(packet.block);
continue;
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:

View File

@ -83,7 +83,8 @@ namespace Protocol
ProfileEvents = 14, /// Packet with profile events from server.
MergeTreeAllRangesAnnounecement = 15,
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
MAX = MergeTreeReadTaskRequest,
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
MAX = TimezoneUpdate,
};
@ -111,6 +112,7 @@ namespace Protocol
"ProfileEvents",
"MergeTreeAllRangesAnnounecement",
"MergeTreeReadTaskRequest",
"TimezoneUpdate",
};
return packet <= MAX
? data[packet]

View File

@ -54,7 +54,7 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54461
#define DBMS_TCP_PROTOCOL_VERSION 54462
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
@ -72,3 +72,5 @@
#define DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS 54460
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES 54461
#define DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES 54462

View File

@ -446,6 +446,7 @@ void TCPHandler::runImpl()
sendSelectProfileEvents();
sendLogs();
return false;
};
@ -483,6 +484,9 @@ void TCPHandler::runImpl()
{
std::lock_guard lock(task_callback_mutex);
sendLogs();
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES
&& client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
sendTimezone();
sendEndOfStream();
}
@ -1035,6 +1039,14 @@ void TCPHandler::sendInsertProfileEvents()
sendProfileEvents();
}
void TCPHandler::sendTimezone()
{
writeVarUInt(Protocol::Server::TimezoneUpdate, *out);
writeStringBinary(DateLUT::instance().getTimeZone(), *out);
out->next();
}
bool TCPHandler::receiveProxyHeader()
{
if (in->eof())

View File

@ -262,6 +262,7 @@ private:
void sendProfileEvents();
void sendSelectProfileEvents();
void sendInsertProfileEvents();
void sendTimezone();
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();

View File

@ -5,5 +5,5 @@ SELECT toDateTime64(toDateTime64('1999-12-12 23:23:23.123', 3), 3, 'Europe/Zuric
SET timezone = 'Europe/Zurich';
SELECT toDateTime64(toDateTime64('1999-12-12 23:23:23.123', 3), 3, 'Asia/Novosibirsk');
SET timezone = 'Абырвалг'; -- { clientError CANNOT_PARSE_DATETIME }
select now(); -- { clientError CANNOT_PARSE_DATETIME }
SET timezone = 'Абырвалг';
select now(); -- { serverError POCO_EXCEPTION }