dbms: Server: fix race conditions [#METR-14410]

This commit is contained in:
Alexey Arno 2015-02-25 15:43:06 +03:00
parent dbca19b7bc
commit 7fdc23c5f2

View File

@ -22,6 +22,12 @@ class RemoteBlockInputStream : public IProfilingBlockInputStream
private:
void init(const Settings * settings_)
{
sent_query.store(false, std::memory_order_release);
finished.store(false, std::memory_order_release);
got_exception_from_replica.store(false, std::memory_order_release);
got_unknown_packet_from_replica.store(false, std::memory_order_release);
was_cancelled.store(false, std::memory_order_release);
if (settings_)
{
send_settings = true;
@ -83,14 +89,13 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
if (isQueryInProgress() && !hasThrownException())
if (hasNoQueryInProgress() || hasThrownException())
return;
if (tryCancel())
{
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
/// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос.
was_cancelled = true;
parallel_replicas->sendCancel();
}
}
@ -136,13 +141,14 @@ protected:
Block readImpl() override
{
if (!sent_query)
if (!sent_query.load(std::memory_order_acquire))
{
createParallelReplicas();
setEstablishedState();
established = true;
parallel_replicas->sendQuery(query, "", stage, true);
established = false;
sent_query.store(true, std::memory_order_release);
sendExternalTables();
setSentQueryState();
}
while (true)
@ -158,14 +164,14 @@ protected:
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_replica = true;
got_exception_from_replica.store(true, std::memory_order_release);
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
if (!parallel_replicas->hasActiveReplicas())
{
finished = true;
finished.store(true, std::memory_order_release);
return Block();
}
break;
@ -197,7 +203,7 @@ protected:
break;
default:
got_unknown_packet_from_replica = true;
got_unknown_packet_from_replica.store(true, std::memory_order_release);
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
@ -220,13 +226,10 @@ protected:
*/
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
if (tryCancel())
{
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read");
was_cancelled = true;
parallel_replicas->sendCancel();
}
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
@ -234,16 +237,16 @@ protected:
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
finished.store(true, std::memory_order_release);
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
got_exception_from_replica.store(true, std::memory_order_release);
packet.exception->rethrow();
break;
default:
got_unknown_packet_from_replica = true;
got_unknown_packet_from_replica.store(true, std::memory_order_release);
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
@ -261,19 +264,19 @@ protected:
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
bool isQueryInProgress() const
{
return sent_query && !finished && !was_cancelled;
return sent_query.load(std::memory_order_acquire) && !finished.load(std::memory_order_acquire) && !was_cancelled.load(std::memory_order_acquire);
}
/// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен.
bool hasNoQueryInProgress() const
{
return !sent_query || finished;
return !sent_query.load(std::memory_order_acquire) || finished.load(std::memory_order_acquire);
}
/// Возвращает true, если исключение было выкинуто.
bool hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
return got_exception_from_replica.load(std::memory_order_acquire) || got_unknown_packet_from_replica.load(std::memory_order_acquire);
}
private:
@ -284,17 +287,18 @@ private:
return instance;
}
/// Перейти в состояние established (т.е. соединения с репликами установлены).
void NO_INLINE setEstablishedState()
/// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен.
bool tryCancel()
{
established = true;
}
/// Перейти в состояние sent_query (т.е. запрос отправлен).
void NO_INLINE setSentQueryState()
{
established = false;
sent_query = true;
bool old_val = false;
bool new_val = true;
if (was_cancelled.compare_exchange_strong(old_val, new_val, std::memory_order_release, std::memory_order_relaxed))
{
parallel_replicas->sendCancel();
return true;
}
else
return false;
}
private:
@ -315,33 +319,33 @@ private:
Context context;
/// Установили соединения с репликами, но ещё не отправили запрос.
bool established = false;
volatile bool established = false;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;
std::atomic<bool> sent_query;
/** Получили все данные от всех реплик, до пакета EndOfStream.
* Если при уничтожении объекта, ещё не все данные считаны,
* то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса,
* и после этого считываются все пакеты до EndOfStream.
*/
bool finished = false;
std::atomic<bool> finished;
/** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT),
* или если на стороне клиента произошло исключение.
*/
bool was_cancelled = false;
std::atomic<bool> was_cancelled;
/** С одной репилки было получено исключение. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_exception_from_replica = false;
std::atomic<bool> got_exception_from_replica;
/** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_unknown_packet_from_replica = false;
std::atomic<bool> got_unknown_packet_from_replica;
Logger * log = &Logger::get("RemoteBlockInputStream");
};