Merge pull request #417 from yandex/revert-384-query-kill

Revert "KILL QUERY"
This commit is contained in:
alexey-milovidov 2017-01-30 16:20:25 +04:00 committed by GitHub
commit 94c54cf629
4 changed files with 17 additions and 18 deletions

View File

@ -84,9 +84,8 @@ protected:
BlockInputStreamPtr query_stream_in; BlockInputStreamPtr query_stream_in;
BlockOutputStreamPtr query_stream_out; BlockOutputStreamPtr query_stream_out;
/// Abovemetioned streams have delayed initialization, this flag indicates thier initialization /// Abovemetioned streams haved delayed initialization, use this flag to track initialization
/// It is better to use atomic (instead of raw bool) with tryGet/setQueryStreams() thread-safe methods despite that /// Can be set to true by single thread, can be read from multiple
/// now in all contexts ProcessListElement is always used under ProcessList::mutex (and raw bool is also Ok)
std::atomic<bool> query_streams_initialized{false}; std::atomic<bool> query_streams_initialized{false};
public: public:
@ -147,7 +146,7 @@ public:
return res; return res;
} }
/// Copies pointers to in/out streams, it can be called once /// Copies pointers to in/out streams
void setQueryStreams(const BlockIO & io); void setQueryStreams(const BlockIO & io);
/// Get query in/out pointers /// Get query in/out pointers
bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const; bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
@ -265,13 +264,13 @@ public:
{ {
NotFound = 0, /// already cancelled NotFound = 0, /// already cancelled
QueryIsNotInitializedYet = 1, QueryIsNotInitializedYet = 1,
CancelCannotBeSent = 2, CancelCannotBeSended = 2,
CancelSent = 3, CancelSended = 3,
Unknown Unknown
}; };
/// Try call cancel() for input and output streams of query with specified id and user /// Try call cancel() for input and output streams of query with specified id and user
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user); CancellationCode sendCancelToQuery(const String & query_id, const String & intial_user);
}; };
} }

View File

@ -7,9 +7,9 @@ namespace DB
class ASTKillQueryQuery : public ASTQueryWithOutput class ASTKillQueryQuery : public ASTQueryWithOutput
{ {
public: public:
ASTPtr where_expression; // expression to filter processes from system.processes table ASTPtr where_expression;
bool sync = false; // SYNC or ASYNC mode bool sync = false;
bool test = false; // does it TEST mode? (doesn't cancel queries just checks and shows them) bool test = false;
ASTKillQueryQuery() = default; ASTKillQueryQuery() = default;

View File

@ -32,9 +32,9 @@ static const char * cancellationCodeToStatus(CancellationCode code)
return "finished"; return "finished";
case CancellationCode::QueryIsNotInitializedYet: case CancellationCode::QueryIsNotInitializedYet:
return "pending"; return "pending";
case CancellationCode::CancelCannotBeSent: case CancellationCode::CancelCannotBeSended:
return "error"; return "error";
case CancellationCode::CancelSent: case CancellationCode::CancelSended:
return "waiting"; return "waiting";
default: default:
return "unknown_status"; return "unknown_status";
@ -141,13 +141,13 @@ public:
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user); auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user);
if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent) if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSended)
{ {
curr_process.processed = true; curr_process.processed = true;
insertResultRow(curr_process.source_num, code, processes_block, res); insertResultRow(curr_process.source_num, code, processes_block, res);
++num_processed_queries; ++num_processed_queries;
} }
/// Wait if QueryIsNotInitializedYet or CancelSent /// Wait if QueryIsNotInitializedYet or CancelSended
} }
/// KILL QUERY could be killed also /// KILL QUERY could be killed also

View File

@ -20,7 +20,7 @@ ProcessList::EntryPtr ProcessList::insert(
const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings) const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings)
{ {
EntryPtr res; EntryPtr res;
bool is_kill_query = ast && typeid_cast<const ASTKillQueryQuery *>(ast); bool is_kill_query = ast && dynamic_cast<const ASTKillQueryQuery *>(ast);
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -151,7 +151,7 @@ void ProcessListElement::setQueryStreams(const BlockIO & io)
{ {
query_stream_in = io.in; query_stream_in = io.in;
query_stream_out = io.out; query_stream_out = io.out;
query_streams_initialized = true; // forces strict memory ordering query_streams_initialized = true;
} }
bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const
@ -229,9 +229,9 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
if (input_stream && (input_stream_casted = dynamic_cast<IProfilingBlockInputStream *>(input_stream.get()))) if (input_stream && (input_stream_casted = dynamic_cast<IProfilingBlockInputStream *>(input_stream.get())))
{ {
input_stream_casted->cancel(); input_stream_casted->cancel();
return CancellationCode::CancelSent; return CancellationCode::CancelSended;
} }
return CancellationCode::CancelCannotBeSent; return CancellationCode::CancelCannotBeSended;
} }
return CancellationCode::QueryIsNotInitializedYet; return CancellationCode::QueryIsNotInitializedYet;
} }