dbms: added waiting when too much queries [#CONV-8692].

This commit is contained in:
Alexey Milovidov 2013-09-07 04:54:59 +00:00
parent 055eda8f81
commit 23be968064
5 changed files with 30 additions and 10 deletions

View File

@ -21,6 +21,7 @@
#define DEFAULT_INTERACTIVE_DELAY 100000
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 128
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
/// Используется в методе reserve, когда известно число строк, но неизвестны их размеры.
#define DBMS_APPROX_STRING_SIZE 64

View File

@ -2,7 +2,10 @@
#include <list>
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <statdaemons/Stopwatch.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteHelpers.h>
@ -12,6 +15,7 @@ namespace DB
{
/** Список исполняющихся в данный момент запросов.
* Также реализует ограничение на их количество.
*/
class ProcessList
@ -33,8 +37,9 @@ public:
private:
Containter cont;
size_t cur_size; /// В C++03 std::list::size не O(1).
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
mutable Poco::FastMutex mutex;
mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального.
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class Entry
@ -51,6 +56,7 @@ private:
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex);
parent.cont.erase(it);
--parent.cur_size;
parent.have_space.signal();
}
};
@ -59,15 +65,18 @@ public:
typedef Poco::SharedPtr<Entry> EntryPtr;
/// Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
EntryPtr insert(const std::string & query_)
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const std::string & query_, size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS)
{
EntryPtr res;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (max_size && cur_size >= max_size)
if (max_size && cur_size >= max_size && (!max_wait_milliseconds || !have_space.tryWait(mutex, max_wait_milliseconds)))
throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES);
++cur_size;

View File

@ -30,6 +30,7 @@ struct Settings
Poco::Timespan connect_timeout_with_failover_ms; /// Если следует выбрать одну из рабочих реплик.
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
Poco::Timespan queue_max_wait_ms; /// Время ожидания в очереди запросов, если количество одновременно выполняющихся запросов превышает максимальное.
/// Блокироваться в цикле ожидания запроса в сервере на указанное количество секунд.
size_t poll_interval;
/// Максимальное количество соединений с одним удалённым сервером в пуле.
@ -54,9 +55,10 @@ struct Settings
asynchronous(true),
interactive_delay(DEFAULT_INTERACTIVE_DELAY),
connect_timeout(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
connect_timeout_with_failover_ms(0, 1000 * DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS),
connect_timeout_with_failover_ms(1000 * DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS),
receive_timeout(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
send_timeout(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
queue_max_wait_ms(1000 * DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS),
poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),

View File

@ -22,6 +22,7 @@ void Settings::set(const String & name, const Field & value)
else if (name == "connect_timeout") connect_timeout = Poco::Timespan(safeGet<UInt64>(value), 0);
else if (name == "receive_timeout") receive_timeout = Poco::Timespan(safeGet<UInt64>(value), 0);
else if (name == "send_timeout") send_timeout = Poco::Timespan(safeGet<UInt64>(value), 0);
else if (name == "queue_max_wait_ms") queue_max_wait_ms = Poco::Timespan(safeGet<UInt64>(value) * 1000);
else if (name == "poll_interval") poll_interval = safeGet<UInt64>(value);
else if (name == "connect_timeout_with_failover_ms")
connect_timeout_with_failover_ms = Poco::Timespan(safeGet<UInt64>(value) * 1000);
@ -45,6 +46,7 @@ void Settings::set(const String & name, ReadBuffer & buf)
|| name == "connect_timeout"
|| name == "receive_timeout"
|| name == "send_timeout"
|| name == "queue_max_wait_ms"
|| name == "poll_interval"
|| name == "connect_timeout_with_failover_ms"
|| name == "max_distributed_connections"
@ -77,6 +79,7 @@ void Settings::set(const String & name, const String & value)
|| name == "connect_timeout"
|| name == "receive_timeout"
|| name == "send_timeout"
|| name == "queue_max_wait_ms"
|| name == "poll_interval"
|| name == "connect_timeout_with_failover_ms"
|| name == "max_distributed_connections"
@ -135,6 +138,7 @@ void Settings::serialize(WriteBuffer & buf) const
writeStringBinary("connect_timeout", buf); writeVarUInt(connect_timeout.totalSeconds(), buf);
writeStringBinary("receive_timeout", buf); writeVarUInt(receive_timeout.totalSeconds(), buf);
writeStringBinary("send_timeout", buf); writeVarUInt(send_timeout.totalSeconds(), buf);
writeStringBinary("queue_max_wait_ms", buf); writeVarUInt(queue_max_wait_ms.totalMilliseconds(), buf);
writeStringBinary("poll_interval", buf); writeVarUInt(poll_interval, buf);
writeStringBinary("connect_timeout_with_failover_ms", buf); writeVarUInt(connect_timeout_with_failover_ms.totalMilliseconds(), buf);
writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf);

View File

@ -74,7 +74,7 @@ void executeQuery(
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && NULL == dynamic_cast<const ASTShowProcesslistQuery *>(&*ast))
process_list_entry = context.getProcessList().insert(query);
process_list_entry = context.getProcessList().insert(query, context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
/// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits);
@ -132,11 +132,19 @@ BlockIO executeQuery(
quota.checkExceeded(current_time);
BlockIO res;
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry;
if (!internal && NULL == dynamic_cast<const ASTShowProcesslistQuery *>(&*ast))
process_list_entry = context.getProcessList().insert(query, context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
try
{
InterpreterQuery interpreter(ast, context, stage);
res = interpreter.execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
@ -144,10 +152,6 @@ BlockIO executeQuery(
throw;
}
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
if (!internal && NULL == dynamic_cast<const ASTShowProcesslistQuery *>(&*ast))
res.process_list_entry = context.getProcessList().insert(query);
quota.addQuery(current_time);
return res;
}