Cancel http read only queries if client socket goes away

To check the socket status, try to read one byte from socket in a
non-blocking way:
      0 - client closed the connection
   >= 1 - client send more data, we are ignoring this case for now
timeout - normal case, client is waiting for response
    ... - socket broken?

Dirty, but should do the job. Limiting to readonly queries as I don't
want to mess with alter queries / insert select and others.
This commit is contained in:
Nicolae Vartolomei 2019-02-01 01:48:25 +00:00
parent 297c2511c7
commit 3fdc04428e
No known key found for this signature in database
GPG Key ID: C8E7675B7C70A0E0
7 changed files with 82 additions and 33 deletions

View File

@ -4,6 +4,7 @@
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/NetException.h>
@ -558,9 +559,44 @@ void HTTPHandler::processQuery(
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
auto appendCallback = [&context] (ProgressCallback callback)
{
auto prev = context.getProgressCallback();
context.setProgressCallback([prev, callback] (const Progress & progress)
{
if (prev)
prev(progress);
callback(progress);
});
};
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if (settings.send_progress_in_http_headers)
context.setProgressCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
appendCallback([&context, &socket](const Progress &)
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
try {
char b;
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();
}
catch (Poco::TimeoutException &) {}
catch (...)
{
context.killCurrentQuery();
}
});
}
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); });

View File

@ -1077,6 +1077,13 @@ void Context::setCurrentQueryId(const String & query_id)
client_info.current_query_id = query_id_to_set;
}
void Context::killCurrentQuery()
{
if (process_list_elem)
{
process_list_elem->cancelQuery(true);
}
};
String Context::getDefaultFormat() const
{

View File

@ -236,6 +236,8 @@ public:
void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id);
void killCurrentQuery();
void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
const std::pair<String, String> & getInsertionTable() const { return insertion_table; }

View File

@ -26,9 +26,6 @@ namespace ErrorCodes
extern const int CANNOT_KILL;
}
using CancellationCode = ProcessList::CancellationCode;
static const char * cancellationCodeToStatus(CancellationCode code)
{
switch (code)

View File

@ -325,6 +325,29 @@ bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStream
return true;
}
CancellationCode QueryStatus::cancelQuery(bool kill)
{
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
if (streamsAreReleased())
return CancellationCode::CancelSent;
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
if (tryGetQueryStreams(input_stream, output_stream))
{
if (input_stream)
{
input_stream->cancel(kill);
return CancellationCode::CancelSent;
}
return CancellationCode::CancelCannotBeSent;
}
/// Query is not even started
is_killed.store(true);
return CancellationCode::CancelSent;
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{
@ -356,7 +379,7 @@ QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query
}
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
{
std::lock_guard lock(mutex);
@ -365,25 +388,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
if (!elem)
return CancellationCode::NotFound;
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
if (elem->streamsAreReleased())
return CancellationCode::CancelSent;
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
if (elem->tryGetQueryStreams(input_stream, output_stream))
{
if (input_stream)
{
input_stream->cancel(kill);
return CancellationCode::CancelSent;
}
return CancellationCode::CancelCannotBeSent;
}
/// Query is not even started
elem->is_killed.store(true);
return CancellationCode::CancelSent;
return elem->cancelQuery(kill);
}

View File

@ -70,6 +70,14 @@ struct QueryStatusInfo
std::shared_ptr<Settings> query_settings;
};
enum class CancellationCode
{
NotFound = 0, /// already cancelled
QueryIsNotInitializedYet = 1,
CancelCannotBeSent = 2,
CancelSent = 3,
Unknown
};
/// Query and information about its execution.
class QueryStatus
@ -192,6 +200,8 @@ public:
/// Get query in/out pointers from BlockIO
bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; }
};
@ -312,15 +322,6 @@ public:
max_size = max_size_;
}
enum class CancellationCode
{
NotFound = 0, /// already cancelled
QueryIsNotInitializedYet = 1,
CancelCannotBeSent = 2,
CancelSent = 3,
Unknown
};
/// Try call cancel() for input and output streams of query with specified id and user
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
};

View File

@ -299,6 +299,7 @@ struct Settings
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, false, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, false, "Convert CROSS JOIN to INNER JOIN if possible") \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};