Merge pull request #29626 from kssenii/fix-signals

Follow-up for #26231
This commit is contained in:
Kseniia Sumarokova 2021-10-03 22:13:54 +03:00 committed by GitHub
commit ddc775b1c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 12 deletions

View File

@ -417,6 +417,7 @@ try
{ {
UseSSL use_ssl; UseSSL use_ssl;
MainThreadStatus::getInstance(); MainThreadStatus::getInstance();
setupSignalHandler();
std::cout << std::fixed << std::setprecision(3); std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3); std::cerr << std::fixed << std::setprecision(3);

View File

@ -414,17 +414,13 @@ try
{ {
UseSSL use_ssl; UseSSL use_ssl;
ThreadStatus thread_status; ThreadStatus thread_status;
setupSignalHandler();
std::cout << std::fixed << std::setprecision(3); std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3); std::cerr << std::fixed << std::setprecision(3);
is_interactive = stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty(); is_interactive = stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty();
std::optional<InterruptListener> interrupt_listener; if (!is_interactive)
if (is_interactive)
{
interrupt_listener.emplace();
}
else
{ {
/// We will terminate process on error /// We will terminate process on error
static KillingErrorHandler error_handler; static KillingErrorHandler error_handler;

View File

@ -67,6 +67,7 @@ namespace ErrorCodes
extern const int NO_DATA_TO_INSERT; extern const int NO_DATA_TO_INSERT;
extern const int UNEXPECTED_PACKET_FROM_SERVER; extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int INVALID_USAGE_OF_INPUT; extern const int INVALID_USAGE_OF_INPUT;
extern const int CANNOT_SET_SIGNAL_HANDLER;
} }
} }
@ -74,6 +75,48 @@ namespace ErrorCodes
namespace DB namespace DB
{ {
std::atomic_flag exit_on_signal = ATOMIC_FLAG_INIT;
class QueryInterruptHandler : private boost::noncopyable
{
public:
QueryInterruptHandler() { exit_on_signal.clear(); }
~QueryInterruptHandler() { exit_on_signal.test_and_set(); }
static bool cancelled() { return exit_on_signal.test(); }
};
/// This signal handler is set only for sigint.
void interruptSignalHandler(int signum)
{
if (exit_on_signal.test_and_set())
_exit(signum);
}
void ClientBase::setupSignalHandler()
{
exit_on_signal.test_and_set();
struct sigaction new_act;
memset(&new_act, 0, sizeof(new_act));
new_act.sa_handler = interruptSignalHandler;
new_act.sa_flags = 0;
#if defined(OS_DARWIN)
sigemptyset(&new_act.sa_mask);
#else
if (sigemptyset(&new_act.sa_mask))
throw Exception(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler.");
#endif
if (sigaction(SIGINT, &new_act, nullptr))
throw Exception(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler.");
}
ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const
{ {
ParserQuery parser(end); ParserQuery parser(end);
@ -454,8 +497,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
/// Also checks if query execution should be cancelled. /// Also checks if query execution should be cancelled.
void ClientBase::receiveResult(ASTPtr parsed_query) void ClientBase::receiveResult(ASTPtr parsed_query)
{ {
InterruptListener interrupt_listener;
bool cancelled = false; bool cancelled = false;
QueryInterruptHandler query_interrupt_handler;
// TODO: get the poll_interval from commandline. // TODO: get the poll_interval from commandline.
const auto receive_timeout = connection_parameters.timeouts.receive_timeout; const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
@ -477,18 +520,17 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
{ {
auto cancel_query = [&] { auto cancel_query = [&] {
connection->sendCancel(); connection->sendCancel();
cancelled = true;
if (is_interactive) if (is_interactive)
{ {
progress_indication.clearProgressOutput(); progress_indication.clearProgressOutput();
std::cout << "Cancelling query." << std::endl; std::cout << "Cancelling query." << std::endl;
}
/// Pressing Ctrl+C twice results in shut down. }
interrupt_listener.unblock(); cancelled = true;
}; };
if (interrupt_listener.check()) /// handler received sigint
if (query_interrupt_handler.cancelled())
{ {
cancel_query(); cancel_query();
} }

View File

@ -30,6 +30,8 @@ enum MultiQueryProcessingStage
PARSING_FAILED, PARSING_FAILED,
}; };
void interruptSignalHandler(int signum);
class ClientBase : public Poco::Util::Application class ClientBase : public Poco::Util::Application
{ {
@ -61,6 +63,7 @@ protected:
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth); static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth);
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const; ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
static void setupSignalHandler();
MultiQueryProcessingStage analyzeMultiQueryText( MultiQueryProcessingStage analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end, const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,