Fixed long ProcessList lock in case of remote server timeout. [#CLICKHOUSE-2799]

This commit is contained in:
Vitaliy Lyudvichenko 2017-02-03 19:15:12 +03:00 committed by alexey-milovidov
parent 326e382387
commit 7e5a075ef3
5 changed files with 45 additions and 28 deletions

View File

@ -78,16 +78,16 @@ struct ProcessListElement
protected: protected:
mutable std::mutex query_streams_mutex;
/// Streams with query results, point to BlockIO from executeQuery() /// Streams with query results, point to BlockIO from executeQuery()
/// This declaration is compatible with notes about BlockIO::process_list_entry: /// This declaration is compatible with notes about BlockIO::process_list_entry:
/// there are no cyclic dependencies: BlockIO::in,out point to objects inside ProcessListElement (not whole object) /// there are no cyclic dependencies: BlockIO::in,out point to objects inside ProcessListElement (not whole object)
BlockInputStreamPtr query_stream_in; BlockInputStreamPtr query_stream_in;
BlockOutputStreamPtr query_stream_out; BlockOutputStreamPtr query_stream_out;
/// Abovemetioned streams have delayed initialization, this flag indicates thier initialization bool query_streams_initialized{false};
/// It is better to use atomic (instead of raw bool) with tryGet/setQueryStreams() thread-safe methods despite that bool query_streams_released{false};
/// now in all contexts ProcessListElement is always used under ProcessList::mutex (and raw bool is also Ok)
std::atomic<bool> query_streams_initialized{false};
public: public:
@ -149,7 +149,12 @@ public:
/// Copies pointers to in/out streams, it can be called once /// Copies pointers to in/out streams, it can be called once
void setQueryStreams(const BlockIO & io); void setQueryStreams(const BlockIO & io);
/// Get query in/out pointers
void releaseQueryStreams();
bool streamsAreReleased();
/// Get query in/out pointers from BlockIO
bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const; bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
}; };

View File

@ -109,6 +109,9 @@ ProcessList::EntryPtr ProcessList::insert(
ProcessListEntry::~ProcessListEntry() ProcessListEntry::~ProcessListEntry()
{ {
/// Destroy all streams to avoid long lock of ProcessList
it->releaseQueryStreams();
std::lock_guard<std::mutex> lock(parent.mutex); std::lock_guard<std::mutex> lock(parent.mutex);
/// Важен порядок удаления memory_tracker-ов. /// Важен порядок удаления memory_tracker-ов.
@ -156,13 +159,34 @@ ProcessListEntry::~ProcessListEntry()
void ProcessListElement::setQueryStreams(const BlockIO & io) void ProcessListElement::setQueryStreams(const BlockIO & io)
{ {
std::lock_guard<std::mutex> lock(query_streams_mutex);
query_stream_in = io.in; query_stream_in = io.in;
query_stream_out = io.out; query_stream_out = io.out;
query_streams_initialized = true; // forces strict memory ordering query_streams_initialized = true;
} }
void ProcessListElement::releaseQueryStreams()
{
std::lock_guard<std::mutex> lock(query_streams_mutex);
query_streams_initialized = false;
query_streams_released = true;
query_stream_in.reset();
query_stream_out.reset();
}
bool ProcessListElement::streamsAreReleased()
{
std::lock_guard<std::mutex> lock(query_streams_mutex);
return query_streams_released;
}
bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const
{ {
std::lock_guard<std::mutex> lock(query_streams_mutex);
if (!query_streams_initialized) if (!query_streams_initialized)
return false; return false;
@ -224,7 +248,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
ProcessListElement * elem = tryGetProcessListElement(current_query_id, current_user); ProcessListElement * elem = tryGetProcessListElement(current_query_id, current_user);
if (!elem) if (!elem || elem->streamsAreReleased())
return CancellationCode::NotFound; return CancellationCode::NotFound;
BlockInputStreamPtr input_stream; BlockInputStreamPtr input_stream;
@ -240,6 +264,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
} }
return CancellationCode::CancelCannotBeSent; return CancellationCode::CancelCannotBeSent;
} }
return CancellationCode::QueryIsNotInitializedYet; return CancellationCode::QueryIsNotInitializedYet;
} }

View File

@ -193,7 +193,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res = interpreter->execute(); res = interpreter->execute();
/// Delayed initialization of query streams (required for KILL QUERY purposes) /// Delayed initialization of query streams (required for KILL QUERY purposes)
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast)) if (process_list_entry)
(*process_list_entry)->setQueryStreams(res); (*process_list_entry)->setQueryStreams(res);
/// Hold element of process list till end of query execution. /// Hold element of process list till end of query execution.

View File

@ -1,6 +1,2 @@
SELECT sleep(2) FROM system.numbers LIMIT 4 SELECT sleep(1) FROM system.numbers LIMIT 4
SELECT sleep(3) FROM system.numbers LIMIT 3 SELECT sleep(1) FROM system.numbers LIMIT 5
SELECT sleep(4) FROM system.numbers LIMIT 2
0
SELECT sleep(1) FROM system.numbers LIMIT 999
0

View File

@ -3,22 +3,13 @@ set -e
QUERY_FIELND_NUM=4 QUERY_FIELND_NUM=4
# Sleep sort. Should be quite deterministic clickhouse-client --max_block_size=1 -q "SELECT sleep(1) FROM system.numbers LIMIT 4" &>/dev/null &
clickhouse-client --max_block_size=1 -q "SELECT sleep(4) FROM system.numbers LIMIT 2" &>/dev/null &
clickhouse-client --max_block_size=1 -q "SELECT sleep(2) FROM system.numbers LIMIT 4" &>/dev/null &
clickhouse-client --max_block_size=1 -q "SELECT sleep(3) FROM system.numbers LIMIT 3" &>/dev/null &
clickhouse-client --max_block_size=1 -q "SELECT 'trash', sleep(2) FROM system.numbers LIMIT 4" &>/dev/null &
sleep 1 # here we need wait 1 sec for init, therefore minimum "sorting element" should be greater than 1 sec (i.e. 2 sec)
clickhouse-client -q "KILL QUERY WHERE query LIKE 'SELECT sleep(%' AND (elapsed >= 0.) SYNC" | cut -f $QUERY_FIELND_NUM
clickhouse-client -q "SELECT countIf(query LIKE 'SELECT sleep(%') FROM system.processes"
clickhouse-client --max_block_size=1 -q "SELECT sleep(1) FROM system.numbers LIMIT 999" &>/dev/null &
sleep 1 sleep 1
clickhouse-client -q "KILL QUERY WHERE query = 'SELECT sleep(1) FROM system.numbers LIMIT 999' ASYNC" | cut -f $QUERY_FIELND_NUM clickhouse-client -q "KILL QUERY WHERE query LIKE 'SELECT sleep(%' AND (elapsed >= 0.) SYNC" | cut -f $QUERY_FIELND_NUM
sleep 1 # wait cancelling
clickhouse-client -q "SELECT countIf(query = 'SELECT sleep(1) FROM system.numbers LIMIT 999') FROM system.processes"
clickhouse-client --max_block_size=1 -q "SELECT sleep(1) FROM system.numbers LIMIT 5" &>/dev/null &
sleep 1
clickhouse-client -q "KILL QUERY WHERE query = 'SELECT sleep(1) FROM system.numbers LIMIT 5' ASYNC" | cut -f $QUERY_FIELND_NUM
clickhouse-client -q "KILL QUERY WHERE 0 ASYNC" clickhouse-client -q "KILL QUERY WHERE 0 ASYNC"
clickhouse-client -q "KILL QUERY WHERE 0 FORMAT TabSeparated" clickhouse-client -q "KILL QUERY WHERE 0 FORMAT TabSeparated"