Merge pull request #36587 from azat/client-reconnect

client: add a message on reconnect (under warning log level)
This commit is contained in:
Alexey Milovidov 2022-04-26 08:44:27 +03:00 committed by GitHub
commit 0cb227501f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 10 deletions

View File

@ -1234,6 +1234,7 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
} }
void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query, bool have_more_data) void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query, bool have_more_data)
try
{ {
QueryPipeline pipeline(std::move(pipe)); QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
@ -1266,6 +1267,12 @@ void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query, bool have_mo
if (!have_more_data) if (!have_more_data)
connection->sendData({}, "", false); connection->sendData({}, "", false);
} }
catch (...)
{
connection->sendCancel();
receiveEndOfQuery();
throw;
}
void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query) void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query)
{ {
@ -1406,7 +1413,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
apply_query_settings(*with_output->settings_ast); apply_query_settings(*with_output->settings_ast);
if (!connection->checkConnected()) if (!connection->checkConnected())
{
auto poco_logs_level = Poco::Logger::parseLevel(config().getString("send_logs_level", "none"));
/// Print under WARNING also because it is used by clickhouse-test.
if (poco_logs_level >= Poco::Message::PRIO_WARNING)
{
fmt::print(stderr, "Connection lost. Reconnecting.\n");
}
connect(); connect();
}
ASTPtr input_function; ASTPtr input_function;
if (insert && insert->select) if (insert && insert->select)

View File

@ -377,9 +377,10 @@ bool Connection::ping()
{ {
// LOG_TRACE(log_wrapper.get(), "Ping"); // LOG_TRACE(log_wrapper.get(), "Ping");
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
try try
{ {
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
UInt64 pong = 0; UInt64 pong = 0;
writeVarUInt(Protocol::Client::Ping, *out); writeVarUInt(Protocol::Client::Ping, *out);
out->next(); out->next();
@ -405,6 +406,10 @@ bool Connection::ping()
} }
catch (const Poco::Exception & e) catch (const Poco::Exception & e)
{ {
/// Explicitly disconnect since ping() can receive EndOfStream,
/// and in this case this ping() will return false,
/// while next ping() may return true.
disconnect();
LOG_TRACE(log_wrapper.get(), fmt::runtime(e.displayText())); LOG_TRACE(log_wrapper.get(), fmt::runtime(e.displayText()));
return false; return false;
} }

View File

@ -4,7 +4,7 @@ drop table if exists mt2;
create table mt1 (n Int64) engine=MergeTree order by n; create table mt1 (n Int64) engine=MergeTree order by n;
create table mt2 (n Int64) engine=MergeTree order by n; create table mt2 (n Int64) engine=MergeTree order by n;
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- no transaction
rollback; -- { serverError INVALID_TRANSACTION } rollback; -- { serverError INVALID_TRANSACTION }
begin transaction; begin transaction;
@ -31,7 +31,7 @@ select 'on exception before start', arraySort(groupArray(n)) from (select n from
-- rollback on exception before start -- rollback on exception before start
select functionThatDoesNotExist(); -- { serverError 46 } select functionThatDoesNotExist(); -- { serverError 46 }
-- cannot commit after exception -- cannot commit after exception
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- after 46
begin transaction; -- { serverError INVALID_TRANSACTION } begin transaction; -- { serverError INVALID_TRANSACTION }
rollback; rollback;
@ -42,7 +42,7 @@ select 'on exception while processing', arraySort(groupArray(n)) from (select n
-- rollback on exception while processing -- rollback on exception while processing
select throwIf(100 < number) from numbers(1000); -- { serverError 395 } select throwIf(100 < number) from numbers(1000); -- { serverError 395 }
-- cannot commit after exception -- cannot commit after exception
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- after 395
insert into mt1 values (5); -- { serverError INVALID_TRANSACTION } insert into mt1 values (5); -- { serverError INVALID_TRANSACTION }
insert into mt2 values (50); -- { serverError INVALID_TRANSACTION } insert into mt2 values (50); -- { serverError INVALID_TRANSACTION }
select 1; -- { serverError INVALID_TRANSACTION } select 1; -- { serverError INVALID_TRANSACTION }
@ -52,10 +52,9 @@ begin transaction;
insert into mt1 values (6); insert into mt1 values (6);
insert into mt2 values (60); insert into mt2 values (60);
select 'on session close', arraySort(groupArray(n)) from (select n from mt1 union all select * from mt2); select 'on session close', arraySort(groupArray(n)) from (select n from mt1 union all select * from mt2);
-- trigger reconnection by error on client, check rollback on session close
insert into mt1 values ([1]); -- { clientError 43 } insert into mt1 values ([1]); -- { clientError 43 }
commit; -- { serverError INVALID_TRANSACTION } -- INSERT failures does not produce client reconnect anymore, so rollback can be done
rollback; -- { serverError INVALID_TRANSACTION } rollback;
begin transaction; begin transaction;
insert into mt1 values (7); insert into mt1 values (7);
@ -82,19 +81,19 @@ rollback;
begin transaction; begin transaction;
create table m (n int) engine=Memory; -- { serverError 48 } create table m (n int) engine=Memory; -- { serverError 48 }
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- after 48
rollback; rollback;
create table m (n int) engine=Memory; create table m (n int) engine=Memory;
begin transaction; begin transaction;
insert into m values (1); -- { serverError 48 } insert into m values (1); -- { serverError 48 }
select * from m; -- { serverError INVALID_TRANSACTION } select * from m; -- { serverError INVALID_TRANSACTION }
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- after 48
rollback; rollback;
begin transaction; begin transaction;
select * from m; -- { serverError 48 } select * from m; -- { serverError 48 }
commit; -- { serverError INVALID_TRANSACTION } commit; -- { serverError INVALID_TRANSACTION } -- after 48
rollback; rollback;
drop table m; drop table m;