mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 02:00:49 +00:00
dbms: Server: queries with several replicas: fixes [#METR-14410]
This commit is contained in:
parent
e8902aa644
commit
77f262f8a8
@ -82,14 +82,14 @@ public:
|
||||
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
|
||||
return;
|
||||
|
||||
if (isInProgress() && !hasThrownException() && !was_cancelled)
|
||||
if (isQueryInProgress() && !hasThrownException())
|
||||
{
|
||||
std::string addresses = parallel_replicas->dumpAddresses();
|
||||
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
|
||||
|
||||
/// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос.
|
||||
parallel_replicas->sendCancel();
|
||||
was_cancelled = true;
|
||||
parallel_replicas->sendCancel();
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,8 +100,14 @@ public:
|
||||
* все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы
|
||||
* эти соединения не остались висеть в рассихронизированном состоянии.
|
||||
*/
|
||||
if (isInProgress())
|
||||
abort();
|
||||
if (isQueryInProgress())
|
||||
{
|
||||
std::string addresses = parallel_replicas->dumpAddresses();
|
||||
LOG_TRACE(log, "(" + addresses + ") Aborting query");
|
||||
|
||||
parallel_replicas->sendCancel();
|
||||
(void) parallel_replicas->drain();
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -177,7 +183,7 @@ protected:
|
||||
*/
|
||||
progressImpl(packet.progress);
|
||||
|
||||
if (!was_cancelled && isInProgress() && isCancelled())
|
||||
if (isQueryInProgress() && isCancelled())
|
||||
cancel();
|
||||
|
||||
break;
|
||||
@ -210,7 +216,7 @@ protected:
|
||||
* - получили с одной реплики неизвестный пакет;
|
||||
* - то больше читать ничего не нужно.
|
||||
*/
|
||||
if (!isInProgress() || hasThrownException())
|
||||
if (hasNoQueryInProgress() || hasThrownException())
|
||||
return;
|
||||
|
||||
/** Если ещё прочитали не все данные, но они больше не нужны.
|
||||
@ -256,22 +262,16 @@ protected:
|
||||
parallel_replicas = ext::make_unique<ParallelReplicas>(pool, parallel_replicas_settings);
|
||||
}
|
||||
|
||||
/** Отменить запросы на всех репликах. Читать и пропускать все оставшиеся пакеты
|
||||
* до EndOfStream или Exception, чтоб не было рассинхронизации в соединениях с репликами.
|
||||
*/
|
||||
void abort()
|
||||
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
|
||||
bool isQueryInProgress() const
|
||||
{
|
||||
std::string addresses = parallel_replicas->dumpAddresses();
|
||||
LOG_TRACE(log, "(" + addresses + ") Aborting query");
|
||||
|
||||
parallel_replicas->sendCancel();
|
||||
(void) parallel_replicas->drain();
|
||||
return sent_query && !finished && !was_cancelled;
|
||||
}
|
||||
|
||||
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
|
||||
bool isInProgress() const
|
||||
/// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен.
|
||||
bool hasNoQueryInProgress() const
|
||||
{
|
||||
return sent_query && !finished;
|
||||
return !sent_query || finished;
|
||||
}
|
||||
|
||||
/// Возвращает true, если исключение было выкинуто.
|
||||
|
Loading…
Reference in New Issue
Block a user