Merge branch 'master' into setting_allow_nondeterministic_mutations

This commit is contained in:
alexey-milovidov 2020-04-10 23:58:46 +03:00 committed by GitHub
commit 25236d3544
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 426 additions and 334 deletions

View File

@ -1,5 +1,5 @@
---
toc_priority: 3
toc_priority: 0
toc_title: Overview
---

View File

@ -66,4 +66,10 @@ ClickHouse uses asynchronous multi-master replication. After being written to an
For more information, see the section [Data replication](../engines/table_engines/mergetree_family/replication.md).
## Features that Can Be Considered Disadvantages {#clickhouse-features-that-can-be-considered-disadvantages}
1. No full-fledged transactions.
2. Lack of ability to modify or delete already inserted data with high rate and low latency. There are batch deletes and updates available to clean up or modify data, for example to comply with [GDPR](https://gdpr-info.eu).
3. The sparse index makes ClickHouse not so suitable for point queries retrieving single rows by their keys.
[Original article](https://clickhouse.tech/docs/en/introduction/distinctive_features/) <!--hide-->

View File

@ -1,12 +0,0 @@
---
toc_priority: 5
toc_title: ClickHouse Features that Can Be Considered Disadvantages
---
# ClickHouse Features that Can Be Considered Disadvantages {#clickhouse-features-that-can-be-considered-disadvantages}
1. No full-fledged transactions.
2. Lack of ability to modify or delete already inserted data with high rate and low latency. There are batch deletes and updates available to clean up or modify data, for example to comply with [GDPR](https://gdpr-info.eu).
3. The sparse index makes ClickHouse not so suitable for point queries retrieving single rows by their keys.
[Original article](https://clickhouse.tech/docs/en/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -2,4 +2,3 @@
toc_folder_title: Statements
toc_priority: 31
---

View File

@ -68,4 +68,10 @@ ClickHouse utiliza la replicación multi-maestro asincrónica. Después de escri
Para obtener más información, consulte la sección [Replicación de datos](../engines/table_engines/mergetree_family/replication.md).
## Características que pueden considerarse desventajas {#clickhouse-features-that-can-be-considered-disadvantages}
1. No hay transacciones completas.
2. Falta de capacidad para modificar o eliminar datos ya insertados con alta tasa y baja latencia. Hay eliminaciones y actualizaciones por lotes disponibles para limpiar o modificar datos, por ejemplo, para cumplir con [GDPR](https://gdpr-info.eu).
3. El índice disperso hace que ClickHouse no sea tan adecuado para consultas de puntos que recuperan filas individuales por sus claves.
[Artículo Original](https://clickhouse.tech/docs/en/introduction/distinctive_features/) <!--hide-->

View File

@ -1,14 +0,0 @@
---
machine_translated: true
machine_translated_rev: 3e185d24c9fe772c7cf03d5475247fb829a21dfa
toc_priority: 5
toc_title: "Caracter\xEDsticas de ClickHouse que pueden considerarse desventajas"
---
# Características de ClickHouse que pueden considerarse desventajas {#clickhouse-features-that-can-be-considered-disadvantages}
1. No hay transacciones completas.
2. Falta de capacidad para modificar o eliminar datos ya insertados con alta tasa y baja latencia. Hay eliminaciones y actualizaciones por lotes disponibles para limpiar o modificar datos, por ejemplo, para cumplir con [GDPR](https://gdpr-info.eu).
3. El índice disperso hace que ClickHouse no sea tan adecuado para consultas de puntos que recuperan filas individuales por sus claves.
[Artículo Original](https://clickhouse.tech/docs/en/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -62,6 +62,12 @@ ClickHouse از روش asynchronous multimaster replication استفاده می
برای اطلاعات بیشتر، به بخش [replication داده ها](../engines/table_engines/mergetree_family/replication.md) مراجعه کنید.
## ویژگی های از ClickHouse که می تواند معایبی باشد. {#wyjgy-hy-z-clickhouse-khh-my-twnd-m-yby-bshd}
1. بدون پشتیبانی کامل از تراکنش
2. عدم توانایی برای تغییر و یا حذف داده های در حال حاضر وارد شده با سرعت بالا و تاخیر کم. برای پاک کردن و یا اصلاح داده ها، به عنوان مثال برای پیروی از [GDPR](https://gdpr-info.eu)، دسته ای پاک و به روزرسانی وجود دارد.حال توسعه می باشد.
3. Sparse index باعث می شود ClickHouse چندان مناسب اجرای پرسمان های point query برای دریافت یک ردیف از داده ها با استفاده از کلید آنها نباشد.
</div>
[مقاله اصلی](https://clickhouse.tech/docs/fa/introduction/distinctive_features/) <!--hide-->

View File

@ -1,11 +0,0 @@
<div markdown="1" markdown="1" dir="rtl">
# ویژگی های از ClickHouse که می تواند معایبی باشد. {#wyjgy-hy-z-clickhouse-khh-my-twnd-m-yby-bshd}
1. بدون پشتیبانی کامل از تراکنش
2. عدم توانایی برای تغییر و یا حذف داده های در حال حاضر وارد شده با سرعت بالا و تاخیر کم. برای پاک کردن و یا اصلاح داده ها، به عنوان مثال برای پیروی از [GDPR](https://gdpr-info.eu)، دسته ای پاک و به روزرسانی وجود دارد.حال توسعه می باشد.
3. Sparse index باعث می شود ClickHouse چندان مناسب اجرای پرسمان های point query برای دریافت یک ردیف از داده ها با استفاده از کلید آنها نباشد.
</div>
[مقاله اصلی](https://clickhouse.tech/docs/fa/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -68,4 +68,10 @@ ClickHouse utilise la réplication multi-maître asynchrone. Après avoir été
Pour plus d'informations, consultez la section [Réplication des données](../engines/table_engines/mergetree_family/replication.md).
## Caractéristiques de ClickHouse qui peuvent être considérées comme des inconvénients {#clickhouse-features-that-can-be-considered-disadvantages}
1. Pas de transactions à part entière.
2. Manque de capacité à modifier ou supprimer des données déjà insérées avec un taux élevé et une faible latence. Des suppressions et des mises à jour par lots sont disponibles pour nettoyer ou modifier les données, par exemple pour [GDPR](https://gdpr-info.eu).
3. L'index clairsemé rend ClickHouse pas si approprié pour les requêtes ponctuelles récupérant des lignes simples par leurs clés.
[Article Original](https://clickhouse.tech/docs/en/introduction/distinctive_features/) <!--hide-->

View File

@ -1,15 +0,0 @@
---
machine_translated: true
machine_translated_rev: f865c9653f9df092694258e0ccdd733c339112f5
toc_priority: 5
toc_title: "Caract\xE9ristiques de ClickHouse qui peuvent \xEAtre consid\xE9r\xE9\
es comme des inconv\xE9nients"
---
# Caractéristiques de ClickHouse qui peuvent être considérées comme des inconvénients {#clickhouse-features-that-can-be-considered-disadvantages}
1. Pas de transactions à part entière.
2. Manque de capacité à modifier ou supprimer des données déjà insérées avec un taux élevé et une faible latence. Des suppressions et des mises à jour par lots sont disponibles pour nettoyer ou modifier les données, par exemple pour [GDPR](https://gdpr-info.eu).
3. L'index clairsemé rend ClickHouse pas si approprié pour les requêtes ponctuelles récupérant des lignes simples par leurs clés.
[Article Original](https://clickhouse.tech/docs/en/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -63,4 +63,10 @@ ClickHouseには、精度を犠牲にしてパフォーマンスを得るため
詳細については、[データ複製](../engines/table_engines/mergetree_family/replication.md) セクションを参照してください。
## 欠点と考えられるClickHouseの機能 {#qian-dian-tokao-erareruclickhousenoji-neng}
1. 本格的なトランザクションはありません。
2. 既に挿入されたデータの変更または削除を、高頻度かつ低遅延に行う機能はありません。 [GDPR](https://gdpr-info.eu)に準拠するなど、データをクリーンアップまたは変更するために、バッチ削除およびバッチ更新が利用可能です。
3. インデックスが疎であるため、ClickHouseは、キーで単一行を取得するようなクエリにはあまり適していません。
[Original article](https://clickhouse.yandex/docs/en/introduction/distinctive_features/) <!--hide-->

View File

@ -1,12 +0,0 @@
---
toc_priority: 5
toc_title: 欠点と見なすことができるClickHouseの機能
---
# 欠点と考えられるClickHouseの機能 {#qian-dian-tokao-erareruclickhousenoji-neng}
1. 本格的なトランザクションはありません。
2. 既に挿入されたデータの変更または削除を、高頻度かつ低遅延に行う機能はありません。 [GDPR](https://gdpr-info.eu)に準拠するなど、データをクリーンアップまたは変更するために、バッチ削除およびバッチ更新が利用可能です。
3. インデックスが疎であるため、ClickHouseは、キーで単一行を取得するようなクエリにはあまり適していません。
[Original article](https://clickhouse.yandex/docs/en/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -98,6 +98,7 @@ functions/ym_dict_functions.md query_language/functions/ym_dict_functions.md
interfaces/http_interface.md interfaces/http.md
interfaces/third-party_client_libraries.md interfaces/third-party/client_libraries.md
interfaces/third-party_gui.md interfaces/third-party/gui.md
introduction/features_considered_disadvantages.md introduction/distinctive_features.md
introduction/possible_silly_questions.md faq/general.md
introduction/ya_metrika_task.md introduction/history.md
operations/performance/sampling_query_profiler.md operations/optimizing_performance/sampling_query_profiler.md

View File

@ -3,7 +3,7 @@ machine_translated: true
machine_translated_rev: 1cd5f0028d917696daf71ac1c9ee849c99c1d5c8
---
# Усыновители ClickHouse {#clickhouse-adopters}
# Пользователи ClickHouse {#clickhouse-adopters}
!!! warning "Оговорка"
Следующий список компаний, использующих ClickHouse, и их истории успеха собраны из открытых источников, поэтому они могут отличаться от текущей реальности. Мы были бы очень признательны, если бы вы поделились историей принятия ClickHouse в свою компанию и [добавьте его в список](https://github.com/ClickHouse/ClickHouse/edit/master/docs/en/introduction/adopters.md), но, пожалуйста, убедитесь, что у вас не будет никаких проблем с NDA, сделав это. Предоставление обновлений с публикациями от других компаний также полезно.

View File

@ -61,4 +61,11 @@ ClickHouse предоставляет различные способы разм
Подробнее смотрите раздел [Репликация данных](../engines/table_engines/mergetree_family/replication.md).
## Особенности, которые могут считаться недостатками {#osobennosti-clickhouse-kotorye-mogut-schitatsia-nedostatkami}
1. Отсутствие полноценных транзакций.
2. Возможность изменять или удалять ранее записанные данные с низкими задержками и высокой частотой запросов не предоставляется. Есть массовое удаление и изменение данных для очистки более не нужного или соответствия [GDPR](https://gdpr-info.eu).
3. Разреженный индекс делает ClickHouse плохо пригодным для точечных чтений одиночных строк по своим
ключам.
[Оригинальная статья](https://clickhouse.tech/docs/ru/introduction/distinctive_features/) <!--hide-->

View File

@ -1,8 +0,0 @@
# Особенности ClickHouse, которые могут считаться недостатками {#osobennosti-clickhouse-kotorye-mogut-schitatsia-nedostatkami}
1. Отсутствие полноценных транзакций.
2. Возможность изменять или удалять ранее записанные данные с низкими задержками и высокой частотой запросов не предоставляется. Есть массовое удаление и изменение данных для очистки более не нужного или соответствия [GDPR](https://gdpr-info.eu).
3. Разреженный индекс делает ClickHouse плохо пригодным для точечных чтений одиночных строк по своим
ключам.
[Оригинальная статья](https://clickhouse.tech/docs/ru/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -35,6 +35,8 @@ def build_nav_entry(root):
title = meta.get('toc_folder_title', 'hidden')
prio = meta.get('toc_priority', 9999)
logging.debug(f'Nav entry: {prio}, {title}, {path}')
if not content.strip():
title = 'hidden'
result_items.append((prio, title, path))
result_items = sorted(result_items, key=lambda x: (x[0], x[1]))
result = collections.OrderedDict([(item[1], item[2]) for item in result_items])
@ -45,8 +47,16 @@ def build_nav(lang, args):
docs_dir = os.path.join(args.docs_dir, lang)
_, _, nav = build_nav_entry(docs_dir)
result = []
index_key = None
for key, value in nav.items():
if key and value:
if value == 'index.md':
index_key = key
continue
result.append({key: value})
if index_key:
key = list(result[0].keys())[0]
result[0][key][index_key] = 'index.md'
result[0][key].move_to_end(index_key, last=False)
print('result', result)
return result

View File

@ -62,4 +62,10 @@ ClickHouse使用异步的多主复制技术。当数据被写入任何一个可
更多信息,参见 [数据复制](../engines/table_engines/mergetree_family/replication.md)。
# 的限制 {#clickhouseke-yi-ren-wei-shi-que-dian-de-gong-neng}
1. 没有完整的事务支持。
2. 缺少高频率,低延迟的修改或删除已存在数据的能力。仅能用于批量删除或修改数据,但这符合 [GDPR](https://gdpr-info.eu)。
3. 稀疏索引使得ClickHouse不适合通过其键检索单行的点查询。
[来源文章](https://clickhouse.tech/docs/en/introduction/distinctive_features/) <!--hide-->

View File

@ -1,8 +0,0 @@
# ClickHouse的限制 {#clickhouseke-yi-ren-wei-shi-que-dian-de-gong-neng}
1. 没有完整的事务支持。
2. 缺少高频率,低延迟的修改或删除已存在数据的能力。仅能用于批量删除或修改数据,但这符合 [GDPR](https://gdpr-info.eu)。
3. 稀疏索引使得ClickHouse不适合通过其键检索单行的点查询。
[来源文章](https://clickhouse.tech/docs/zh/introduction/features_considered_disadvantages/) <!--hide-->

View File

@ -29,19 +29,17 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DEADLOCK_AVOIDED;
}
/** A single-use object that represents lock's ownership
/** A one-time-use-object that represents lock ownership
* For the purpose of exception safety guarantees LockHolder is to be used in two steps:
* 1. Create an instance (allocating all the memory needed)
* 1. Create an instance (allocating all the needed memory)
* 2. Associate the instance with the lock (attach to the lock and locking request group)
*/
class RWLockImpl::LockHolderImpl
{
bool bound{false};
Type lock_type;
String query_id;
CurrentMetrics::Increment active_client_increment;
RWLock parent;
@ -53,24 +51,30 @@ public:
/// Implicit memory allocation for query_id is done here
LockHolderImpl(const String & query_id_, Type type)
: lock_type{type}, query_id{query_id_},
active_client_increment{
: query_id{query_id_}
, active_client_increment{
type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
{
}
~LockHolderImpl();
~LockHolderImpl()
{
if (bound && parent != nullptr)
parent->unlock(it_group, query_id);
else
active_client_increment.destroy();
}
private:
/// A separate method which binds the lock holder to the owned lock
/// N.B. It is very important that this method produces no allocations
bool bindWith(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
{
if (bound)
if (bound || parent_ == nullptr)
return false;
it_group = it_group_;
parent = std::move(parent_);
++it_group->refererrs;
++it_group->requests;
bound = true;
return true;
}
@ -79,56 +83,27 @@ private:
};
namespace
{
/// Global information about all read locks that query has. It is needed to avoid some type of deadlocks.
class QueryLockInfo
{
private:
mutable std::mutex mutex;
std::map<std::string, size_t> queries;
public:
void add(const String & query_id)
{
std::lock_guard lock(mutex);
const auto res = queries.emplace(query_id, 1); // may throw
if (!res.second)
++res.first->second;
}
void remove(const String & query_id) noexcept
{
std::lock_guard lock(mutex);
const auto query_it = queries.find(query_id);
if (query_it != queries.cend() && --query_it->second == 0)
queries.erase(query_it);
}
void check(const String & query_id) const
{
std::lock_guard lock(mutex);
if (queries.find(query_id) != queries.cend())
throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
}
};
QueryLockInfo all_read_locks;
}
/** To guarantee that we do not get any piece of our data corrupted:
/** General algorithm:
* Step 1. Try the FastPath (for both Reads/Writes)
* Step 2. Find ourselves request group: attach to existing or create a new one
* Step 3. Wait/timed wait for ownership signal
* Step 3a. Check if we must handle timeout and exit
* Step 4. Persist lock ownership
*
* To guarantee that we do not get any piece of our data corrupted:
* 1. Perform all actions that include allocations before changing lock's internal state
* 2. Roll back any changes that make the state inconsistent
*
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
? std::chrono::time_point<std::chrono::steady_clock>::max()
: std::chrono::steady_clock::now() + lock_timeout_ms;
const bool request_has_query_id = query_id != NO_QUERY;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -145,100 +120,111 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
/// This object is placed above unique_lock, because it may lock in destructor.
auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
std::unique_lock lock(mutex);
std::unique_lock state_lock(internal_state_mtx);
/// The FastPath:
/// Check if the same query_id already holds the required lock in which case we can proceed without waiting
if (request_has_query_id)
{
const auto it_query = owner_queries.find(query_id);
if (it_query != owner_queries.end())
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
{
const auto current_owner_group = queue.begin();
if (wrlock_owner != writers_queue.end())
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
ErrorCodes::LOGICAL_ERROR);
/// XXX: it means we can't upgrade lock from read to write!
/// Lock upgrading is not supported
if (type == Write)
throw Exception(
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
ErrorCodes::LOGICAL_ERROR);
if (current_owner_group->type == Write)
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
ErrorCodes::LOGICAL_ERROR);
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back)
++it_query->second; /// SM2: nothrow
lock_holder->bindWith(shared_from_this(), current_owner_group); /// SM3: nothrow
++owner_query_it->second; /// SM1: nothrow
lock_holder->bindWith(shared_from_this(), rdlock_owner); /// SM2: nothrow
finalize_metrics();
return lock_holder;
}
}
/** If the query already has any active read lock and tries to acquire another read lock
* but it is not in front of the queue and has to wait, deadlock is possible:
*
* Example (four queries, two RWLocks - 'a' and 'b'):
*
* --> time -->
*
* q1: ra rb
* q2: wa
* q3: rb ra
* q4: wb
*
* We will throw an exception instead.
*/
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
if (type == Type::Write)
{
if (type == Type::Read && request_has_query_id && !queue.empty())
all_read_locks.check(query_id);
/// Create a new group of locking requests
queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else if (request_has_query_id && queue.size() > 1)
all_read_locks.check(query_id);
else if (readers_queue.empty() ||
(rdlock_owner == readers_queue.begin() && !writers_queue.empty()))
{
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
GroupsContainer::iterator it_group =
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
GroupsContainer::iterator it_group = std::prev(queue.end());
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
{
if (type == Type::Read)
{
rdlock_owner = it_group; /// SM2: nothrow
}
else
{
wrlock_owner = it_group; /// SM2: nothrow
}
}
else
{
/// Wait until our group becomes the lock owner
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
/// We need to reference the associated group before waiting to guarantee
/// that this group does not get deleted prematurely
++it_group->refererrs;
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
{
++it_group->requests;
it_group->cv.wait(state_lock, predicate);
--it_group->requests;
}
else
{
++it_group->requests;
const auto wait_result = it_group->cv.wait_until(state_lock, lock_deadline_tp, predicate);
--it_group->requests;
/// Wait a notification until we will be the only in the group.
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
/// Step 3a. Check if we must handle timeout and exit
if (!wait_result) /// Wait timed out!
{
if (it_group->requests == 0)
{
/// Roll back SM1
if (type == Read)
{
readers_queue.erase(it_group); /// Rollback(SM1): nothrow
}
else
{
writers_queue.erase(it_group); /// Rollback(SM1): nothrow
}
}
--it_group->refererrs;
return nullptr;
}
}
}
if (request_has_query_id)
{
try
{
if (type == Type::Read)
all_read_locks.add(query_id); /// SM2: may throw on insertion
/// and is safe to roll back unconditionally
const auto emplace_res =
owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion
owner_queries.emplace(query_id, 1); /// SM2: may throw on insertion
if (!emplace_res.second)
++emplace_res.first->second; /// SM4: nothrow
++emplace_res.first->second; /// SM3: nothrow
}
catch (...)
{
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: all_read_locks and the locking queue
if (type == Type::Read)
all_read_locks.remove(query_id); /// Rollback(SM2): nothrow
if (it_group->refererrs == 0)
{
const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow
if (next != queue.end())
next->cv.notify_all();
}
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
if (it_group->requests == 0)
eraseGroup(it_group); /// Rollback(SM1): nothrow
throw;
}
@ -251,10 +237,9 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
}
/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl:
* 1. all_read_locks is updated
* 2. owner_queries is updated
* 3. request group is updated by LockHolderImpl which in turn becomes "bound"
/** The sequence points of acquiring lock ownership by an instance of LockHolderImpl:
* 1. owner_queries is updated
* 2. request group is updated by LockHolderImpl which in turn becomes "bound"
*
* If by the time when destructor of LockHolderImpl is called the instance has been "bound",
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
@ -262,38 +247,74 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
*
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
*/
RWLockImpl::LockHolderImpl::~LockHolderImpl()
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
{
if (!bound || parent == nullptr)
std::lock_guard state_lock(internal_state_mtx);
/// All of theses are Undefined behavior and nothing we can do!
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
return;
std::lock_guard lock(parent->mutex);
/// The associated group must exist (and be the beginning of the queue?)
if (parent->queue.empty() || it_group != parent->queue.begin())
if (rdlock_owner != readers_queue.end() && group_it != rdlock_owner)
return;
if (wrlock_owner != writers_queue.end() && group_it != wrlock_owner)
return;
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != RWLockImpl::NO_QUERY)
if (query_id != NO_QUERY)
{
const auto owner_it = parent->owner_queries.find(query_id);
if (owner_it != parent->owner_queries.end())
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
{
if (--owner_it->second == 0) /// SM: nothrow
parent->owner_queries.erase(owner_it); /// SM: nothrow
if (lock_type == RWLockImpl::Read)
all_read_locks.remove(query_id); /// SM: nothrow
if (--owner_query_it->second == 0) /// SM: nothrow
owner_queries.erase(owner_query_it); /// SM: nothrow
}
}
/// If we are the last remaining referrer, remove the group and notify the next group
if (--it_group->refererrs == 0) /// SM: nothrow
{
const auto next = parent->queue.erase(it_group); /// SM: nothrow
if (next != parent->queue.end())
next->cv.notify_all();
}
/// If we are the last remaining referrer, remove this QNode and notify the next one
if (--group_it->requests == 0) /// SM: nothrow
eraseGroup(group_it);
}
void RWLockImpl::eraseGroup(GroupsContainer::iterator group_it) noexcept
{
rdlock_owner = readers_queue.end();
wrlock_owner = writers_queue.end();
if (group_it->type == Read)
{
readers_queue.erase(group_it);
/// Prepare next phase
if (!writers_queue.empty())
{
wrlock_owner = writers_queue.begin();
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
writers_queue.erase(group_it);
/// Prepare next phase
if (!readers_queue.empty())
{
rdlock_owner = readers_queue.begin();
}
else
{
wrlock_owner = writers_queue.begin();
}
}
if (rdlock_owner != readers_queue.end())
{
rdlock_owner->cv.notify_all();
}
else if (wrlock_owner != writers_queue.end())
{
wrlock_owner->cv.notify_one();
}
}
}

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <chrono>
#include <list>
#include <vector>
#include <mutex>
@ -19,7 +20,8 @@ using RWLock = std::shared_ptr<RWLockImpl>;
/// Implements shared lock with FIFO service
/// Can be acquired recursively (several calls for the same query) in Read mode
/// (Phase Fair RWLock as suggested in https://www.cs.unc.edu/~anderson/papers/rtsj10-for-web.pdf)
/// Can be acquired recursively (for the same query) in Read mode
///
/// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already
/// acquired by another thread of the same query. Otherwise the following deadlock is possible:
@ -42,37 +44,44 @@ public:
friend class LockHolderImpl;
using LockHolder = std::shared_ptr<LockHolderImpl>;
/// Waits in the queue and returns appropriate lock
/// Empty query_id means the lock is acquired out of the query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id);
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();
inline static const auto default_locking_timeout_ms = std::chrono::milliseconds(120000);
private:
RWLockImpl() = default;
struct Group;
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
/// Group of locking requests that should be granted concurrently
/// i.e. a group can contain several readers, but only one writer
/// Group of locking requests that should be granted simultaneously
/// i.e. one or several readers or a single writer
struct Group
{
const Type type;
size_t refererrs;
size_t requests;
std::condition_variable cv; /// all locking requests of the group wait on this condvar
explicit Group(Type type_) : type{type_}, refererrs{0} {}
explicit Group(Type type_) : type{type_}, requests{0} {}
};
GroupsContainer queue;
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
private:
mutable std::mutex internal_state_mtx;
GroupsContainer readers_queue;
GroupsContainer writers_queue;
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase
/// or readers_queue.end() otherwise
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
/// or writers_queue.end() otherwise
OwnerQueryIds owner_queries;
mutable std::mutex mutex;
private:
RWLockImpl() = default;
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
void eraseGroup(GroupsContainer::iterator group_it) noexcept;
};
}

View File

@ -150,9 +150,16 @@ TEST(Common, RWLockDeadlock)
usleep(100000);
usleep(100000);
usleep(100000);
usleep(100000);
try
{
auto holder2 = lock2->getLock(RWLockImpl::Read, "q1");
auto holder2 = lock2->getLock(RWLockImpl::Read, "q1", std::chrono::milliseconds(100));
if (!holder2)
{
throw Exception(
"Locking attempt timed out! Possible deadlock avoided. Client should retry.",
ErrorCodes::DEADLOCK_AVOIDED);
}
}
catch (const Exception & e)
{
@ -174,9 +181,16 @@ TEST(Common, RWLockDeadlock)
auto holder2 = lock2->getLock(RWLockImpl::Read, "q3");
usleep(100000);
usleep(100000);
usleep(100000);
try
{
auto holder1 = lock1->getLock(RWLockImpl::Read, "q3");
auto holder1 = lock1->getLock(RWLockImpl::Read, "q3", std::chrono::milliseconds(100));
if (!holder1)
{
throw Exception(
"Locking attempt timed out! Possible deadlock avoided. Client should retry.",
ErrorCodes::DEADLOCK_AVOIDED);
}
}
catch (const Exception & e)
{

View File

@ -91,3 +91,7 @@
# define ASAN_UNPOISON_MEMORY_REGION(a, b)
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.
/// Check with IStorage class for the list of possible locks
#define DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC 120

View File

@ -407,6 +407,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_parser_depth, 1000, "Maximum parser depth.", 0) \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.", 0) \
M(SettingBool, allow_nondeterministic_mutations, false, "Allow non-deterministic functions in ALTER UPDATE/ALTER DELETE statements", 0) \
M(SettingSeconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -25,7 +25,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
addTableLock(
storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -54,7 +55,9 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
addTableLock(
materialized_view->lockStructureForShare(
true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();

View File

@ -5,6 +5,7 @@
#include <string>
#include <Databases/DatabaseMySQL.h>
#include <Common/parseAddress.h>
#include <Core/SettingsCollection.h>
#include <IO/Operators.h>
#include <Formats/MySQLBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
@ -40,6 +41,7 @@ namespace ErrorCodes
constexpr static const auto suffix = ".remove_flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
static const SettingSeconds lock_acquire_timeout{10};
static String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
@ -358,7 +360,7 @@ void DatabaseMySQL::cleanOutdatedTables()
++iterator;
else
{
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY, lock_acquire_timeout);
(*iterator)->shutdown();
(*iterator)->is_dropped = true;

View File

@ -66,7 +66,8 @@ FunctionBaseImplPtr JoinGetOverloadResolver::build(const ColumnsWithTypeAndName
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
auto table_lock = storage_join->lockStructureForShare(
false, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

View File

@ -104,7 +104,7 @@ INSTANTIATE_TEST_SUITE_P(Basic,
DateLUT::instance("Europe/Minsk")
},
{
"When scale is 0, subsecond part is 0 despite beeing present in string.",
"When scale is 0, subsecond part is 0 despite being present in string.",
"2019-09-16 19:20:17.123",
1568650817ULL,
0,

View File

@ -82,7 +82,9 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
auto table_lock_holder = table->lockStructureForShare(
false /* because mutation is executed asyncronously */,
context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}
@ -109,7 +111,8 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
auto table_lock_holder = table->lockAlterIntention(
context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata);

View File

@ -403,7 +403,8 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
as_storage_lock = as_storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.

View File

@ -89,7 +89,8 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = DatabaseCatalog::instance().getTable(table_id);
}
auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(
false, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
columns = table->getColumns();
}

View File

@ -93,7 +93,7 @@ BlockIO InterpreterDropQuery::executeToTable(
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table from memory, don't touch data and metadata
database->detachTable(table_name);
}
@ -103,7 +103,7 @@ BlockIO InterpreterDropQuery::executeToTable(
table->checkTableCanBeDropped();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table data, don't touch metadata
table->truncate(query_ptr, context, table_lock);
}
@ -115,7 +115,7 @@ BlockIO InterpreterDropQuery::executeToTable(
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
const std::string metadata_file_without_extension = database->getMetadataPath() + escapeForFileName(table_id.table_name);
const auto prev_metadata_name = metadata_file_without_extension + ".sql";
@ -216,7 +216,8 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
if (kind == ASTDropQuery::Kind::Truncate)
{
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock =
table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table data, don't touch metadata
table->truncate(query_ptr, context, table_lock);
}
@ -225,7 +226,8 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
context_handle.removeExternalTable(table_name);
table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock =
table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Delete table data
table->drop(table_lock);
table->is_dropped = true;

View File

@ -109,7 +109,8 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;
StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(
true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto query_sample_block = getSampleBlock(query, table);
if (!query.table_function)

View File

@ -79,7 +79,8 @@ BlockIO InterpreterRenameQuery::execute()
{
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name));
auto from_table = database_catalog.getTable({elem.from_database_name, elem.from_table_name});
auto from_table_lock = from_table->lockExclusively(context.getCurrentQueryId());
auto from_table_lock =
from_table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
database_catalog.getDatabase(elem.from_database_name)->renameTable(
context,

View File

@ -255,7 +255,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
table_lock = storage->lockStructureForShare(
false, context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
table_id = storage->getStorageID();
}

View File

@ -326,7 +326,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
table->shutdown();
{
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
create_ast = database->getCreateTableQuery(system_context, replica.table_name);
database->detachTable(replica.table_name);

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int TABLE_IS_DROPPED;
extern const int NOT_IMPLEMENTED;
extern const int DEADLOCK_AVOIDED;
}
IStorage::IStorage(StorageID storage_id_, ColumnsDescription virtuals_) : storage_id(std::move(storage_id_)), virtuals(std::move(virtuals_))
@ -314,48 +315,64 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
RWLockImpl::LockHolder IStorage::tryLockTimed(
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout)
{
auto lock_holder = rwlock->getLock(type, query_id, std::chrono::milliseconds(acquire_timeout.totalMilliseconds()));
if (!lock_holder)
{
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
throw Exception(
type_str + " locking attempt on \"" + getStorageID().getFullTableName() +
"\" has timed out! (" + toString(acquire_timeout.totalMilliseconds()) + "ms) "
"Possible deadlock avoided. Client should retry.",
ErrorCodes::DEADLOCK_AVOIDED);
}
return lock_holder;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
result.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Read, query_id, acquire_timeout);
result.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Read, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id)
TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
result.alter_intention_lock = tryLockTimed(alter_intention_lock, RWLockImpl::Write, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id, const SettingSeconds & acquire_timeout)
{
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
lock_holder.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
}
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
result.alter_intention_lock = tryLockTimed(alter_intention_lock, RWLockImpl::Write, query_id, acquire_timeout);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
result.new_data_structure_lock = tryLockTimed(new_data_structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
result.structure_lock = tryLockTimed(structure_lock, RWLockImpl::Write, query_id, acquire_timeout);
return result;
}
@ -370,7 +387,7 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);

View File

@ -195,22 +195,26 @@ private:
IndicesDescription indices;
ConstraintsDescription constraints;
private:
RWLockImpl::LockHolder tryLockTimed(
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout);
public:
/// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id);
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id, const SettingSeconds & acquire_timeout);
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(const String & query_id);
TableStructureWriteLockHolder lockAlterIntention(const String & query_id, const SettingSeconds & acquire_timeout);
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed.
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id);
void lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id, const SettingSeconds & acquire_timeout);
/// Acquire the full exclusive lock immediately. No other queries can run concurrently.
TableStructureWriteLockHolder lockExclusively(const String & query_id);
TableStructureWriteLockHolder lockExclusively(const String & query_id, const SettingSeconds & acquire_timeout);
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,

View File

@ -519,7 +519,7 @@ void StorageLiveView::drop(TableStructureWriteLockHolder &)
void StorageLiveView::refresh(const Context & context)
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
auto alter_lock = lockAlterIntention(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
{
std::lock_guard lock(mutex);
if (getNewBlocks())

View File

@ -85,7 +85,8 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
try
{
auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto storage_lock = data.lockStructureForShare(
false, RWLockImpl::NO_QUERY, data.getSettings()->lock_acquire_timeout_for_background_operations);
MergeTreeData::DataPartPtr part = findPart(part_name);

View File

@ -42,6 +42,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \
M(SettingSeconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \
\
/** Inserts settings. */ \
M(SettingUInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -57,7 +57,8 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(false, "");
auto lock = storage.lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
storage.clearOldTemporaryDirectories();
}

View File

@ -203,7 +203,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
else if (part->name == part_name)
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->getColumns(), part->checksums);

View File

@ -1758,7 +1758,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
+ " to " + covered.back() + ") that are still not present or beeing processed by "
+ " to " + covered.back() + ") that are still not present or being processed by "
+ " other background process on this replica between " + left->name + " and " + right->name;
return false;
}
@ -1791,7 +1791,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
/// version X for this part.
/// We cannot mutate part if it's beeing inserted with quorum and it's not
/// We cannot mutate part if it's being inserted with quorum and it's not
/// already reached.
if (part->name == inprogress_quorum_part)
return {};

View File

@ -168,7 +168,8 @@ Pipes StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
auto destination_lock = destination->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{
@ -757,7 +758,7 @@ std::optional<UInt64> StorageBuffer::totalBytes() const
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
checkAlterIsPossible(params, context.getSettingsRef());

View File

@ -463,7 +463,7 @@ void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, co
void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
checkAlterIsPossible(params, context.getSettingsRef());

View File

@ -185,7 +185,9 @@ Pipes StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
@ -200,7 +202,8 @@ Pipes StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(
true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto stream = storage->write(query, context);
stream->addTableLock(lock);
return stream;
@ -258,7 +261,7 @@ void StorageMaterializedView::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);

View File

@ -118,7 +118,8 @@ bool StorageMerge::isRemote() const
bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
{
/// It's beneficial if it is true for at least one table.
StorageListWithLocks selected_tables = getSelectedTables(query_context.getCurrentQueryId());
StorageListWithLocks selected_tables = getSelectedTables(
query_context.getCurrentQueryId(), query_context.getSettingsRef());
size_t i = 0;
for (const auto & table : selected_tables)
@ -195,7 +196,7 @@ Pipes StorageMerge::read(
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(
query_info.query, has_table_virtual_column, context.getCurrentQueryId());
query_info.query, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
@ -356,7 +357,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id) const
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id, const Settings & settings) const
{
StorageListWithLocks selected_tables;
auto iterator = getDatabaseIterator();
@ -365,7 +366,8 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id), iterator->name());
selected_tables.emplace_back(
table, table->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name());
iterator->next();
}
@ -374,7 +376,8 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, const String & query_id) const
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const
{
StorageListWithLocks selected_tables;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator();
@ -390,7 +393,8 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this)
{
selected_tables.emplace_back(storage, storage->lockStructureForShare(false, query_id), iterator->name());
selected_tables.emplace_back(
storage, storage->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name());
virtual_column->insert(iterator->name());
}
@ -435,7 +439,7 @@ void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Se
void StorageMerge::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();

View File

@ -57,9 +57,10 @@ private:
using StorageWithLockAndName = std::tuple<StoragePtr, TableStructureReadLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
StorageListWithLocks getSelectedTables(const String & query_id) const;
StorageListWithLocks getSelectedTables(const String & query_id, const Settings & settings) const;
StorageMerge::StorageListWithLocks getSelectedTables(const ASTPtr & query, bool has_virtual_column, const String & query_id) const;
StorageMerge::StorageListWithLocks getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;

View File

@ -223,7 +223,7 @@ void StorageMergeTree::alter(
/// This alter can be performed at metadata level only
if (commands.isSettingsAlter())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
changeSettings(metadata.settings_ast, table_lock_holder);
@ -231,7 +231,7 @@ void StorageMergeTree::alter(
}
else
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
changeSettings(metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
@ -541,7 +541,8 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
FutureMergedMutatedPart future_part;
@ -659,7 +660,8 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
bool StorageMergeTree::tryMutatePart()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part;
@ -790,7 +792,8 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
auto lock_structure = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
@ -983,14 +986,16 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
freezePartition(command.partition, command.with_name, context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
freezeAll(command.with_name, context, lock);
}
break;
@ -1008,7 +1013,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockExclusively(context.getCurrentQueryId());
auto lock = lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
String partition_id = getPartitionIDFromQuery(partition, context);
@ -1055,8 +1060,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
@ -1126,8 +1131,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)

View File

@ -48,7 +48,7 @@ void StorageNull::checkAlterIsPossible(const AlterCommands & commands, const Set
void StorageNull::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_id = getStorageID();
StorageInMemoryMetadata metadata = getInMemoryMetadata();

View File

@ -1045,7 +1045,8 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name)
@ -1180,7 +1181,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
MutableDataPartPtr new_part;
Transaction transaction(*this);
@ -1534,7 +1536,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add;
DataPartsVector parts_to_remove;
auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock_holder_dst_table = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
{
@ -1596,7 +1599,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0;
}
table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
table_lock_holder_src_table = source_table->lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated};
@ -2719,7 +2723,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
TableStructureReadLockHolder table_lock_holder;
if (!to_detached)
table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
table_lock_holder = lockStructureForShare(
true, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// Logging
Stopwatch stopwatch;
@ -3186,7 +3191,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
{
/// TODO (relax this lock)
auto table_lock = lockExclusively(RWLockImpl::NO_QUERY);
auto table_lock = lockExclusively(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
@ -3213,7 +3218,8 @@ void StorageReplicatedMergeTree::alter(
if (params.isSettingsAlter())
{
lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId());
lockStructureExclusively(
table_lock_holder, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
StorageInMemoryMetadata metadata = getInMemoryMetadata();
@ -3279,7 +3285,8 @@ void StorageReplicatedMergeTree::alter(
if (ast_to_str(current_metadata.settings_ast) != ast_to_str(future_metadata.settings_ast))
{
lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId());
lockStructureExclusively(
table_lock_holder, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
/// Just change settings
current_metadata.settings_ast = future_metadata.settings_ast;
changeSettings(current_metadata.settings_ast, table_lock_holder);
@ -3448,14 +3455,16 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(
false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
freezePartition(command.partition, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
auto lock = lockStructureForShare(
false, query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
freezeAll(command.with_name, query_context, lock);
}
break;
@ -4463,7 +4472,8 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto zookeeper = getZooKeeper();
DataPartsVector parts = grabOldParts();
@ -4758,8 +4768,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
const Context & context)
{
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockStructureForShare(true, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
@ -4937,8 +4947,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)

View File

@ -62,12 +62,12 @@ public:
ColumnPtr databases_,
ColumnPtr tables_,
Storages storages_,
const std::shared_ptr<const ContextAccess> & access_,
String query_id_)
const Context & context)
: SourceWithProgress(header_)
, columns_mask(std::move(columns_mask_)), max_block_size(max_block_size_)
, databases(std::move(databases_)), tables(std::move(tables_)), storages(std::move(storages_))
, query_id(std::move(query_id_)), total_tables(tables->size()), access(access_)
, total_tables(tables->size()), access(context.getAccess())
, query_id(context.getCurrentQueryId()), lock_acquire_timeout(context.getSettingsRef().lock_acquire_timeout)
{
}
@ -103,7 +103,7 @@ protected:
try
{
table_lock = storage->lockStructureForShare(false, query_id);
table_lock = storage->lockStructureForShare(false, query_id, lock_acquire_timeout);
}
catch (const Exception & e)
{
@ -227,10 +227,11 @@ private:
ColumnPtr databases;
ColumnPtr tables;
Storages storages;
String query_id;
size_t db_table_num = 0;
size_t total_tables;
std::shared_ptr<const ContextAccess> access;
String query_id;
SettingSeconds lock_acquire_timeout;
};
@ -331,8 +332,8 @@ Pipes StorageSystemColumns::read(
pipes.emplace_back(std::make_shared<ColumnsSource>(
std::move(columns_mask), std::move(header), max_block_size,
std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages),
context.getAccess(), context.getCurrentQueryId()));
std::move(filtered_database_column), std::move(filtered_table_column),
std::move(storages), context));
return pipes;
}

View File

@ -62,7 +62,7 @@ StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_stat
}
StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context)
: query_id(context.getCurrentQueryId())
: query_id(context.getCurrentQueryId()), settings(context.getSettings())
{
/// Will apply WHERE to subset of columns and then add more columns.
/// This is kind of complicated, but we use WHERE to do less work.
@ -192,7 +192,7 @@ StoragesInfo StoragesInfoStream::next()
try
{
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructureForShare(false, query_id);
info.table_lock = info.storage->lockStructureForShare(false, query_id, settings.lock_acquire_timeout);
}
catch (const Exception & e)
{

View File

@ -36,6 +36,8 @@ public:
private:
String query_id;
Settings settings;
ColumnPtr database_column;
ColumnPtr table_column;

View File

@ -244,7 +244,8 @@ protected:
if (need_lock_structure)
{
table = tables_it->table();
lock = table->lockStructureForShare(false, context.getCurrentQueryId());
lock = table->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
}
}
catch (const Exception & e)