Merge remote-tracking branch 'upstream/master' into HEAD

This commit is contained in:
Anton Popov 2022-02-01 15:23:07 +03:00
commit 836a348a9c
142 changed files with 1671 additions and 528 deletions

View File

@ -1,26 +1,42 @@
#pragma once
#include <pdqsort.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#include <miniselect/floyd_rivest_select.h>
template <class RandomIt>
template <typename RandomIt>
void nth_element(RandomIt first, RandomIt nth, RandomIt last)
{
::miniselect::floyd_rivest_select(first, nth, last);
}
template <class RandomIt>
template <typename RandomIt>
void partial_sort(RandomIt first, RandomIt middle, RandomIt last)
{
::miniselect::floyd_rivest_partial_sort(first, middle, last);
}
template <class RandomIt, class Compare>
template <typename RandomIt, typename Compare>
void partial_sort(RandomIt first, RandomIt middle, RandomIt last, Compare compare)
{
::miniselect::floyd_rivest_partial_sort(first, middle, last, compare);
}
#pragma GCC diagnostic pop
template <typename RandomIt, typename Compare>
void sort(RandomIt first, RandomIt last, Compare compare)
{
::pdqsort(first, last, compare);
}
template <typename RandomIt>
void sort(RandomIt first, RandomIt last)
{
using value_type = typename std::iterator_traits<RandomIt>::value_type;
using comparator = std::less<value_type>;
::pdqsort(first, last, comparator());
}

View File

@ -125,10 +125,6 @@ For installing CMake and Ninja on Mac OS X first install Homebrew and then insta
Next, check the version of CMake: `cmake --version`. If it is below 3.12, you should install a newer version from the website: https://cmake.org/download/.
## Optional External Libraries {#optional-external-libraries}
ClickHouse uses several external libraries for building. All of them do not need to be installed separately as they are built together with ClickHouse from the sources located in the submodules. You can check the list in `contrib`.
## C++ Compiler {#c-compiler}
Compilers Clang starting from version 11 is supported for building ClickHouse.

View File

@ -97,13 +97,16 @@ Structure of the `patterns` section:
``` text
pattern
rule_type
regexp
function
pattern
rule_type
regexp
age + precision
...
pattern
rule_type
regexp
function
age + precision
@ -127,12 +130,20 @@ When processing a row, ClickHouse checks the rules in the `pattern` sections. Ea
Fields for `pattern` and `default` sections:
- `regexp` A pattern for the metric name.
- `rule_type` - a rule's type. It's applied only to a particular metrics. The engine use it to separate plain and tagged metrics. Optional parameter. Default value: `all`.
It's unnecessary when performance is not critical, or only one metrics type is used, e.g. plain metrics. By default only one type of rules set is created. Otherwise, if any of special types is defined, two different sets are created. One for plain metrics (root.branch.leaf) and one for tagged metrics (root.branch.leaf;tag1=value1).
The default rules are ended up in both sets.
Valid values:
- `all` (default) - a universal rule, used when `rule_type` is omitted.
- `plain` - a rule for plain metrics. The field `regexp` is processed as regular expression.
- `tagged` - a rule for tagged metrics (metrics are stored in DB in the format of `someName?tag1=value1&tag2=value2&tag3=value3`). Regular expression must be sorted by tags' names, first tag must be `__name__` if exists. The field `regexp` is processed as regular expression.
- `tag_list` - a rule for tagged matrics, a simple DSL for easier metric description in graphite format `someName;tag1=value1;tag2=value2`, `someName`, or `tag1=value1;tag2=value2`. The field `regexp` is translated into a `tagged` rule. The sorting by tags' names is unnecessary, ti will be done automatically. A tag's value (but not a name) can be set as a regular expression, e.g. `env=(dev|staging)`.
- `regexp` A pattern for the metric name (a regular or DSL).
- `age` The minimum age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds. Should be a divisor for 86400 (seconds in a day).
- `function` The name of the aggregating function to apply to data whose age falls within the range `[age, age + precision]`. Accepted functions: min / max / any / avg. The average is calculated imprecisely, like the average of the averages.
### Configuration Example {#configuration-example}
### Configuration Example without rules types {#configuration-example}
``` xml
<graphite_rollup>
@ -167,6 +178,81 @@ Fields for `pattern` and `default` sections:
</graphite_rollup>
```
### Configuration Example with rules types {#configuration-typed-example}
``` xml
<graphite_rollup>
<version_column_name>Version</version_column_name>
<pattern>
<rule_type>plain</rule_type>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^someName\?(.*&)*tag1=value1(&|$)]]></regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>someName;tag2=value2</regexp>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
```
!!! warning "Warning"
Data rollup is performed during merges. Usually, for old partitions, merges are not started, so for rollup it is necessary to trigger an unscheduled merge using [optimize](../../../sql-reference/statements/optimize.md). Or use additional tools, for example [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer).

View File

@ -54,10 +54,8 @@ If the set of columns in the Buffer table does not match the set of columns in a
If the types do not match for one of the columns in the Buffer table and a subordinate table, an error message is entered in the server log, and the buffer is cleared.
The same thing happens if the subordinate table does not exist when the buffer is flushed.
If you need to run ALTER for a subordinate table, and the Buffer table, we recommend first deleting the Buffer table, running ALTER for the subordinate table, then creating the Buffer table again.
!!! attention "Attention"
Running ALTER on the Buffer table in releases made before 28 Sep 2020 will cause a `Block structure mismatch` error (see [#15117](https://github.com/ClickHouse/ClickHouse/issues/15117)), so deleting the Buffer table and then recreating is the only option. It is advisable to check that this error is fixed in your release before trying to run ALTER on the Buffer table.
Running ALTER on the Buffer table in releases made before 26 Oct 2021 will cause a `Block structure mismatch` error (see [#15117](https://github.com/ClickHouse/ClickHouse/issues/15117) and [#30565](https://github.com/ClickHouse/ClickHouse/pull/30565)), so deleting the Buffer table and then recreating is the only option. It is advisable to check that this error is fixed in your release before trying to run ALTER on the Buffer table.
If the server is restarted abnormally, the data in the buffer is lost.

View File

@ -25,6 +25,7 @@ Categories:
- **[Operations](../faq/operations/index.md)**
- [Which ClickHouse version to use in production?](../faq/operations/production.md)
- [Is it possible to delete old records from a ClickHouse table?](../faq/operations/delete-old-data.md)
- [Does ClickHouse support multi-region replication?](../faq/operations/multi-region-replication.md)
- **[Integration](../faq/integration/index.md)**
- [How do I export data from ClickHouse to a file?](../faq/integration/file-export.md)
- [What if I have a problem with encodings when connecting to Oracle via ODBC?](../faq/integration/oracle-odbc.md)

View File

@ -23,11 +23,13 @@ Web UI can be accessed here: `http://localhost:8123/play`.
![Web UI](../images/play.png)
In health-check scripts use `GET /ping` request. This handler always returns “Ok.” (with a line feed at the end). Available from version 18.12.13.
In health-check scripts use `GET /ping` request. This handler always returns “Ok.” (with a line feed at the end). Available from version 18.12.13. See also `/replicas_status` to check replica's delay.
``` bash
$ curl 'http://localhost:8123/ping'
Ok.
$ curl 'http://localhost:8123/replicas_status'
Ok.
```
Send the request as a URL query parameter, or as a POST. Or send the beginning of the query in the query parameter, and the rest in the POST (well explain later why this is necessary). The size of the URL is limited to 16 KB, so keep this in mind when sending large queries.

View File

@ -72,7 +72,7 @@ Reloads all [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-mo
**Syntax**
```sql
SYSTEM RELOAD MODELS
SYSTEM RELOAD MODELS [ON CLUSTER cluster_name]
```
## RELOAD MODEL {#query_language-system-reload-model}
@ -82,7 +82,7 @@ Completely reloads a CatBoost model `model_name` if the configuration was update
**Syntax**
```sql
SYSTEM RELOAD MODEL <model_name>
SYSTEM RELOAD MODEL [ON CLUSTER cluster_name] <model_name>
```
## RELOAD FUNCTIONS {#query_language-system-reload-functions}
@ -92,8 +92,8 @@ Reloads all registered [executable user defined functions](../functions/index.md
**Syntax**
```sql
RELOAD FUNCTIONS
RELOAD FUNCTION function_name
RELOAD FUNCTIONS [ON CLUSTER cluster_name]
RELOAD FUNCTION [ON CLUSTER cluster_name] function_name
```
## DROP DNS CACHE {#query_language-system-drop-dns-cache}

View File

@ -99,13 +99,16 @@ patterns
``` text
pattern
rule_type
regexp
function
pattern
rule_type
regexp
age + precision
...
pattern
rule_type
regexp
function
age + precision
@ -129,12 +132,20 @@ default
Поля для разделов `pattern` и `default`:
- `regexp` шаблон имени метрики.
- `rule_type` - тип правила (применяется только к метрикам указанных типов), используется для разделения правил проверки плоских/теггированных метрик. Опциональное поле. Значение по умолчанию: `all`.
Если используются метрики только одного типа или производительность проверки правил некритична, можно не использовать. По умолчанию создается только один тип правил для проверки. Иначе, если хотя бы для одного правила указано отличное от умолчания значение, создаются 2 независимых типа правил - для обычных (классические root.branch.leaf) и теггированных метрик (root.branch.leaf;tag1=value1).
Правила по умолчанию попадают в оба правила обоих типов.
Возможные значения:
- `all` (default) - универсальное правило, назначается также по умолчанию, если поле не задано
- `plain` - правило для плоских метрик (без тегов). Поле `regexp` обрабатывается как регулярное выражение.
- `tagged` - правило для теггированных метрик (метрика хранится в БД в формате `someName?tag1=value1&tag2=value2&tag3=value3`), регулярное выражение должно быть отсортированно по именам тегов, первым - значение тега `__name__`, если есть. Поле `regexp` обрабатывается как регулярное выражение.
- `tag_list` - правило для теггированных метрик, простой DSL для упрощения задания регулярного выражения в формате тегов graphite `someName;tag1=value1;tag2=value2`, `someName` или `tag1=value1;tag2=value2`. Поле `regexp` транслируется в правило `tagged`. Cортировать по именам тегов не обязательно, оно отсортируется автоматически. Значение тега (но не имя) может быть регулярным выражением (например `env=(dev|staging)`).
- `regexp` шаблон имени метрики (регулярное выражение или DSL).
- `age` минимальный возраст данных в секундах.
- `precision` точность определения возраста данных в секундах. Должен быть делителем для 86400 (количество секунд в сутках).
- `function` имя агрегирующей функции, которую следует применить к данным, чей возраст оказался в интервале `[age, age + precision]`. Допустимые функции: min/max/any/avg. Avg вычисляется неточно, как среднее от средних.
### Пример конфигурации {#configuration-example}
### Пример конфигурации без разделения типа правил {#configuration-example}
``` xml
<graphite_rollup>
@ -169,6 +180,80 @@ default
</graphite_rollup>
```
### Пример конфигурации c разделением типа правил {#configuration-typed-example}
``` xml
<graphite_rollup>
<version_column_name>Version</version_column_name>
<pattern>
<rule_type>plain</rule_type>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^someName\?(.*&)*tag1=value1(&|$)]]></regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>someName;tag2=value2</regexp>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
```
!!! warning "Внимание"
Прореживание данных производится во время слияний. Обычно для старых партиций слияния не запускаются, поэтому для прореживания надо инициировать незапланированное слияние используя [optimize](../../../sql-reference/statements/optimize.md). Или использовать дополнительные инструменты, например [graphite-ch-optimizer](https://github.com/innogames/graphite-ch-optimizer).

View File

@ -48,10 +48,8 @@ CREATE TABLE merge.hits_buffer AS merge.hits ENGINE = Buffer(merge, hits, 16, 10
Если у одного из столбцов таблицы Buffer и подчинённой таблицы не совпадает тип, то в лог сервера будет записано сообщение об ошибке и буфер будет очищен.
То же самое происходит, если подчинённая таблица не существует в момент сброса буфера.
Если есть необходимость выполнить ALTER для подчинённой таблицы и для таблицы Buffer, то рекомендуется удалить таблицу Buffer, затем выполнить ALTER подчинённой таблицы, а после создать таблицу Buffer заново.
!!! attention "Внимание"
В релизах до 28 сентября 2020 года выполнение ALTER на таблице Buffer ломает структуру блоков и вызывает ошибку (см. [#15117](https://github.com/ClickHouse/ClickHouse/issues/15117)), поэтому удаление буфера и его пересоздание — единственный вариант миграции для данного движка. Перед выполнением ALTER на таблице Buffer убедитесь, что в вашей версии эта ошибка устранена.
В релизах до 26 октября 2021 года выполнение ALTER на таблице Buffer ломает структуру блоков и вызывает ошибку (см. [#15117](https://github.com/ClickHouse/ClickHouse/issues/15117) и [#30565](https://github.com/ClickHouse/ClickHouse/pull/30565)), поэтому удаление буфера и его пересоздание — единственный вариант миграции для данного движка. Перед выполнением ALTER на таблице Buffer убедитесь, что в вашей версии эта ошибка устранена.
При нештатном перезапуске сервера, данные, находящиеся в буфере, будут потеряны.

View File

@ -18,6 +18,17 @@ $ curl 'http://localhost:8123/'
Ok.
```
Web UI 可以通过这个地址访问: `http://localhost:8123/play`.
在运行状况检查脚本中,使用`GET /ping`请求。这个处理方法总是返回 "Ok"。(以换行结尾)。可从18.12.13版获得。请参见' /replicas_status '检查复制集的延迟。
``` bash
$ curl 'http://localhost:8123/ping'
Ok.
$ curl 'http://localhost:8123/replicas_status'
Ok.
```
通过URL中的 `query` 参数来发送请求或者发送POST请求或者将查询的开头部分放在URL的`query`参数中其他部分放在POST中我们会在后面解释为什么这样做是有必要的。URL的大小会限制在16KB所以发送大型查询时要时刻记住这点。
如果请求成功将会收到200的响应状态码和响应主体中的结果。

View File

@ -1 +0,0 @@
../../../en/sql-reference/statements/truncate.md

View File

@ -0,0 +1,31 @@
---
toc_priority: 52
toc_title: TRUNCATE
---
# TRUNCATE 语句 {#truncate-statement}
``` sql
TRUNCATE TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster]
```
删除表中的所有数据。当省略子句 `IF EXISTS` 时,如果表不存在,则查询返回一个错误。
`TRUNCATE` 查询不支持[View](../../engines/table-engines/special/view.md),[File](../../engines/table-engines/special/file.md), [URL](../../engines/table-engines/special/url.md), [Buffer](../../engines/table-engines/special/buffer.md) 和 [Null](../../engines/table-engines/special/null.md)表引擎。
可以使用 replication_alter_partitions_sync 设置在复制集上等待执行的操作。
通过 replication_wait_for_inactive_replica_timeout 设置,可以指定不活动副本执行 `TRUNCATE`查询需要等待多长时间(以秒为单位)。
!!! info "注意"
如果`replication_alter_partitions_sync` 被设置为`2`,并且某些复制集超过 `replication_wait_for_inactive_replica_timeout`设置的时间不激活,那么将抛出一个异常`UNFINISHED`。

View File

@ -194,6 +194,7 @@ namespace
{
void setupTmpPath(Poco::Logger * log, const std::string & path)
try
{
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
@ -212,6 +213,15 @@ void setupTmpPath(Poco::Logger * log, const std::string & path)
LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string());
}
}
catch (...)
{
DB::tryLogCurrentException(
log,
fmt::format(
"Caught exception while setup temporary path: {}. It is ok to skip this exception as cleaning old temporary files is not "
"necessary",
path));
}
int waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{

View File

@ -3,6 +3,7 @@
#include <algorithm>
#include <memory>
#include <boost/noncopyable.hpp>
#include <base/sort.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/HashTable/SmallTable.h>
@ -557,7 +558,7 @@ public:
}
if (limit < answer.size())
{
std::nth_element(answer.begin(), answer.begin() + limit, answer.end());
::nth_element(answer.begin(), answer.begin() + limit, answer.end());
answer.resize(limit);
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <base/sort.h>
#include <Common/Arena.h>
#include <Common/NaNUtils.h>
@ -72,7 +74,7 @@ private:
private:
void sort()
{
std::sort(points, points + size,
::sort(points, points + size,
[](const WeightedValue & first, const WeightedValue & second)
{
return first.mean < second.mean;

View File

@ -1,5 +1,7 @@
#pragma once
#include <unordered_set>
#include <AggregateFunctions/AggregateFunctionNull.h>
#include <Columns/ColumnsNumber.h>
@ -7,6 +9,7 @@
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <base/arithmeticOverflow.h>
#include <base/sort.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
@ -14,8 +17,6 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <unordered_set>
namespace DB
{
@ -67,7 +68,7 @@ struct AggregateFunctionIntervalLengthSumData
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
{
std::sort(std::begin(segments), std::end(segments));
::sort(std::begin(segments), std::end(segments));
}
else
{
@ -76,10 +77,10 @@ struct AggregateFunctionIntervalLengthSumData
const auto end = std::end(segments);
if (!sorted)
std::sort(begin, middle);
::sort(begin, middle);
if (!other.sorted)
std::sort(middle, end);
::sort(middle, end);
std::inplace_merge(begin, middle, end);
}
@ -91,7 +92,7 @@ struct AggregateFunctionIntervalLengthSumData
{
if (!sorted)
{
std::sort(std::begin(segments), std::end(segments));
::sort(std::begin(segments), std::end(segments));
sorted = true;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <unordered_map>
#include <base/sort.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnFixedString.h>
@ -226,7 +227,7 @@ public:
{
keys.push_back(it.first);
}
std::sort(keys.begin(), keys.end());
::sort(keys.begin(), keys.end());
// insert using sorted keys to result column
for (auto & key : keys)

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/logger_useful.h>
#include <base/sort.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
@ -142,7 +143,7 @@ public:
auto & array = this->data(place).value;
/// Sort by position; for equal position, sort by weight to get deterministic result.
std::sort(array.begin(), array.end());
::sort(array.begin(), array.end());
for (const auto & point_weight : array)
{

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
#include <base/range.h>
#include <base/sort.h>
#include <Common/PODArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -76,7 +77,7 @@ struct AggregateFunctionSequenceMatchData final
{
if (!sorted)
{
std::sort(std::begin(events_list), std::end(events_list), Comparator{});
::sort(std::begin(events_list), std::end(events_list), Comparator{});
sorted = true;
}
}

View File

@ -2,6 +2,7 @@
#include <base/types.h>
#include <base/bit_cast.h>
#include <base/sort.h>
#include <Common/HashTable/HashMap.h>
#include <IO/ReadBuffer.h>
@ -134,7 +135,7 @@ private:
++arr_it;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
Float64 threshold = std::ceil(sum_weight * level);
Float64 accumulated = 0;
@ -175,7 +176,7 @@ private:
++arr_it;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
size_t level_index = 0;
Float64 accumulated = 0;

View File

@ -88,7 +88,7 @@ struct QuantileExact : QuantileExactBase<Value, QuantileExact<Value>>
if (!array.empty())
{
size_t n = level < 1 ? level * array.size() : (array.size() - 1);
nth_element(array.begin(), array.begin() + n, array.end()); /// NOTE: You can think of the radix-select algorithm.
::nth_element(array.begin(), array.begin() + n, array.end()); /// NOTE: You can think of the radix-select algorithm.
return array[n];
}
@ -107,7 +107,7 @@ struct QuantileExact : QuantileExactBase<Value, QuantileExact<Value>>
auto level = levels[indices[i]];
size_t n = level < 1 ? level * array.size() : (array.size() - 1);
nth_element(array.begin() + prev_n, array.begin() + n, array.end());
::nth_element(array.begin() + prev_n, array.begin() + n, array.end());
result[indices[i]] = array[n];
prev_n = n;
}
@ -143,7 +143,7 @@ struct QuantileExactExclusive : public QuantileExact<Value>
else if (n < 1)
return static_cast<Float64>(array[0]);
nth_element(array.begin(), array.begin() + n - 1, array.end());
::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_elem = std::min_element(array.begin() + n, array.end());
return static_cast<Float64>(array[n - 1]) + (h - n) * static_cast<Float64>(*nth_elem - array[n - 1]);
@ -172,7 +172,7 @@ struct QuantileExactExclusive : public QuantileExact<Value>
result[indices[i]] = static_cast<Float64>(array[0]);
else
{
nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_elem = std::min_element(array.begin() + n, array.end());
result[indices[i]] = static_cast<Float64>(array[n - 1]) + (h - n) * static_cast<Float64>(*nth_elem - array[n - 1]);
@ -207,7 +207,7 @@ struct QuantileExactInclusive : public QuantileExact<Value>
return static_cast<Float64>(array[array.size() - 1]);
else if (n < 1)
return static_cast<Float64>(array[0]);
nth_element(array.begin(), array.begin() + n - 1, array.end());
::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_elem = std::min_element(array.begin() + n, array.end());
return static_cast<Float64>(array[n - 1]) + (h - n) * static_cast<Float64>(*nth_elem - array[n - 1]);
@ -234,7 +234,7 @@ struct QuantileExactInclusive : public QuantileExact<Value>
result[indices[i]] = static_cast<Float64>(array[0]);
else
{
nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_elem = std::min_element(array.begin() + n, array.end());
result[indices[i]] = static_cast<Float64>(array[n - 1]) + (h - n) * (static_cast<Float64>(*nth_elem) - array[n - 1]);
@ -263,7 +263,7 @@ struct QuantileExactLow : public QuantileExactBase<Value, QuantileExactLow<Value
if (!array.empty())
{
// sort inputs in ascending order
std::sort(array.begin(), array.end());
::sort(array.begin(), array.end());
// if level is 0.5 then compute the "low" median of the sorted array
// by the method of rounding.
@ -296,7 +296,7 @@ struct QuantileExactLow : public QuantileExactBase<Value, QuantileExactLow<Value
if (!array.empty())
{
// sort inputs in ascending order
std::sort(array.begin(), array.end());
::sort(array.begin(), array.end());
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];
@ -345,7 +345,7 @@ struct QuantileExactHigh : public QuantileExactBase<Value, QuantileExactHigh<Val
if (!array.empty())
{
// sort inputs in ascending order
std::sort(array.begin(), array.end());
::sort(array.begin(), array.end());
// if level is 0.5 then compute the "high" median of the sorted array
// by the method of rounding.
@ -370,7 +370,7 @@ struct QuantileExactHigh : public QuantileExactBase<Value, QuantileExactHigh<Val
if (!array.empty())
{
// sort inputs in ascending order
std::sort(array.begin(), array.end());
::sort(array.begin(), array.end());
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];

View File

@ -1,5 +1,7 @@
#pragma once
#include <base/sort.h>
#include <Common/HashTable/HashMap.h>
#include <Common/NaNUtils.h>
@ -101,7 +103,7 @@ struct QuantileExactWeighted
++i;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
Float64 threshold = std::ceil(sum_weight * level);
Float64 accumulated = 0;
@ -151,7 +153,7 @@ struct QuantileExactWeighted
++i;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
Float64 accumulated = 0;

View File

@ -90,7 +90,7 @@ namespace detail
/** This function must be called before get-functions. */
void prepare() const
{
std::sort(elems, elems + count);
::sort(elems, elems + count);
}
UInt16 get(double level) const
@ -183,7 +183,7 @@ namespace detail
/// Sorting an array will not be considered a violation of constancy.
auto & array = elems;
nth_element(array.begin(), array.begin() + n, array.end());
::nth_element(array.begin(), array.begin() + n, array.end());
quantile = array[n];
}
@ -204,7 +204,7 @@ namespace detail
? level * elems.size()
: (elems.size() - 1);
nth_element(array.begin() + prev_n, array.begin() + n, array.end());
::nth_element(array.begin() + prev_n, array.begin() + n, array.end());
result[level_index] = array[n];
prev_n = n;

View File

@ -2,6 +2,8 @@
#include <vector>
#include <base/sort.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/NaNUtils.h>
@ -64,7 +66,7 @@ struct QuantileLevels
permutation[i] = i;
}
std::sort(permutation.begin(), permutation.end(), [this] (size_t a, size_t b) { return levels[a] < levels[b]; });
::sort(permutation.begin(), permutation.end(), [this] (size_t a, size_t b) { return levels[a] < levels[b]; });
}
};

View File

@ -4,6 +4,7 @@
#include <algorithm>
#include <climits>
#include <base/types.h>
#include <base/sort.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -15,6 +16,7 @@
#include <Poco/Exception.h>
#include <pcg_random.hpp>
namespace DB
{
struct Settings;
@ -249,7 +251,7 @@ private:
if (sorted)
return;
sorted = true;
std::sort(samples.begin(), samples.end(), Comparer());
::sort(samples.begin(), samples.end(), Comparer());
}
template <typename ResultType>

View File

@ -5,6 +5,7 @@
#include <climits>
#include <AggregateFunctions/ReservoirSampler.h>
#include <base/types.h>
#include <base/sort.h>
#include <Common/HashTable/Hash.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
@ -258,7 +259,8 @@ private:
{
if (sorted)
return;
std::sort(samples.begin(), samples.end(), [](const auto & lhs, const auto & rhs) { return lhs.first < rhs.first; });
::sort(samples.begin(), samples.end(), [](const auto & lhs, const auto & rhs) { return lhs.first < rhs.first; });
sorted = true;
}

View File

@ -1,13 +1,17 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ArenaAllocator.h>
#include <numeric>
#include <algorithm>
#include <utility>
#include <base/sort.h>
#include <Common/ArenaAllocator.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
namespace DB
{
struct Settings;
@ -41,7 +45,7 @@ std::pair<RanksArray, Float64> computeRanksAndTieCorrection(const Values & value
/// Save initial positions, than sort indices according to the values.
std::vector<size_t> indexes(size);
std::iota(indexes.begin(), indexes.end(), 0);
std::sort(indexes.begin(), indexes.end(),
::sort(indexes.begin(), indexes.end(),
[&] (size_t lhs, size_t rhs) { return values[lhs] < values[rhs]; });
size_t left = 0;

View File

@ -15,6 +15,7 @@
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
#include <base/insertAtEnd.h>
#include <base/sort.h>
#include <boost/range/adaptor/reversed.hpp>
#include <filesystem>
@ -632,7 +633,7 @@ BackupEntries makeBackupEntries(const Elements & elements, const ContextPtr & co
throw Exception("Backup must not be empty", ErrorCodes::BACKUP_IS_EMPTY);
/// Check that all backup entries are unique.
std::sort(
::sort(
backup_entries.begin(),
backup_entries.end(),
[](const std::pair<String, std::unique_ptr<IBackupEntry>> & lhs, const std::pair<String, std::unique_ptr<IBackupEntry>> & rhs)

View File

@ -17,6 +17,7 @@
#include <Common/Exception.h>
#include <Poco/String.h>
#include <algorithm>
#include <base/sort.h>
namespace DB
@ -74,10 +75,10 @@ AvailableCollationLocales::LocalesVector AvailableCollationLocales::getAvailable
result.push_back(name_and_locale.second);
auto comparator = [] (const LocaleAndLanguage & f, const LocaleAndLanguage & s)
{
return f.locale_name < s.locale_name;
};
std::sort(result.begin(), result.end(), comparator);
{
return f.locale_name < s.locale_name;
};
::sort(result.begin(), result.end(), comparator);
return result;
}

View File

@ -16,6 +16,7 @@
#include <Common/WeakHash.h>
#include <Common/HashTable/Hash.h>
#include <base/unaligned.h>
#include <base/sort.h>
#include <cstring> // memcpy
@ -810,9 +811,9 @@ void ColumnArray::getPermutationImpl(size_t limit, Permutation & res, Comparator
auto less = [&cmp](size_t lhs, size_t rhs){ return cmp(lhs, rhs) < 0; };
if (limit)
partial_sort(res.begin(), res.begin() + limit, res.end(), less);
::partial_sort(res.begin(), res.begin() + limit, res.end(), less);
else
std::sort(res.begin(), res.end(), less);
::sort(res.begin(), res.end(), less);
}
void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const

View File

@ -9,7 +9,6 @@
#include <base/sort.h>
#include <base/scope_guard.h>
#include <IO/WriteHelpers.h>
#include <Columns/ColumnsCommon.h>
@ -132,7 +131,7 @@ template <is_decimal T>
void ColumnDecimal<T>::updatePermutation(bool reverse, size_t limit, int, IColumn::Permutation & res, EqualRanges & equal_ranges) const
{
auto equals = [this](size_t lhs, size_t rhs) { return data[lhs] == data[rhs]; };
auto sort = [](auto begin, auto end, auto pred) { std::sort(begin, end, pred); };
auto sort = [](auto begin, auto end, auto pred) { ::sort(begin, end, pred); };
auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); };
if (reverse)

View File

@ -169,9 +169,9 @@ protected:
sort_end = res.begin() + limit;
if (reverse)
partial_sort(res.begin(), sort_end, res.end(), [this](size_t a, size_t b) { return data[a] > data[b]; });
::partial_sort(res.begin(), sort_end, res.end(), [this](size_t a, size_t b) { return data[a] > data[b]; });
else
partial_sort(res.begin(), sort_end, res.end(), [this](size_t a, size_t b) { return data[a] < data[b]; });
::partial_sort(res.begin(), sort_end, res.end(), [this](size_t a, size_t b) { return data[a] < data[b]; });
}
};

View File

@ -192,9 +192,9 @@ void ColumnFixedString::getPermutation(bool reverse, size_t limit, int /*nan_dir
else
{
if (reverse)
std::sort(res.begin(), res.end(), greater(*this));
::sort(res.begin(), res.end(), greater(*this));
else
std::sort(res.begin(), res.end(), less(*this));
::sort(res.begin(), res.end(), less(*this));
}
}

View File

@ -335,9 +335,9 @@ void ColumnString::getPermutationImpl(size_t limit, Permutation & res, Comparato
auto less = [&cmp](size_t lhs, size_t rhs){ return cmp(lhs, rhs) < 0; };
if (limit)
partial_sort(res.begin(), res.begin() + limit, res.end(), less);
::partial_sort(res.begin(), res.begin() + limit, res.end(), less);
else
std::sort(res.begin(), res.end(), less);
::sort(res.begin(), res.end(), less);
}
void ColumnString::getPermutation(bool reverse, size_t limit, int /*nan_direction_hint*/, Permutation & res) const

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnTuple.h>
#include <base/sort.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnCompressed.h>
#include <Core/Field.h>
@ -384,9 +385,9 @@ void ColumnTuple::getPermutationImpl(size_t limit, Permutation & res, LessOperat
limit = 0;
if (limit)
partial_sort(res.begin(), res.begin() + limit, res.end(), less);
::partial_sort(res.begin(), res.begin() + limit, res.end(), less);
else
std::sort(res.begin(), res.end(), less);
::sort(res.begin(), res.end(), less);
}
void ColumnTuple::updatePermutationImpl(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges, const Collator * collator) const

View File

@ -1,6 +1,5 @@
#include "ColumnVector.h"
#include <pdqsort.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
@ -118,7 +117,6 @@ struct ColumnVector<T>::equals
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::equals(parent.data[lhs], parent.data[rhs], nan_direction_hint); }
};
namespace
{
template <typename T>
@ -158,9 +156,9 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
res[i] = i;
if (reverse)
partial_sort(res.begin(), res.begin() + limit, res.end(), greater(*this, nan_direction_hint));
::partial_sort(res.begin(), res.begin() + limit, res.end(), greater(*this, nan_direction_hint));
else
partial_sort(res.begin(), res.begin() + limit, res.end(), less(*this, nan_direction_hint));
::partial_sort(res.begin(), res.begin() + limit, res.end(), less(*this, nan_direction_hint));
}
else
{
@ -204,16 +202,16 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
res[i] = i;
if (reverse)
pdqsort(res.begin(), res.end(), greater(*this, nan_direction_hint));
::sort(res.begin(), res.end(), greater(*this, nan_direction_hint));
else
pdqsort(res.begin(), res.end(), less(*this, nan_direction_hint));
::sort(res.begin(), res.end(), less(*this, nan_direction_hint));
}
}
template <typename T>
void ColumnVector<T>::updatePermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_range) const
{
auto sort = [](auto begin, auto end, auto pred) { pdqsort(begin, end, pred); };
auto sort = [](auto begin, auto end, auto pred) { ::sort(begin, end, pred); };
auto partial_sort = [](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); };
if (reverse)

View File

@ -528,7 +528,7 @@ protected:
template <typename Derived>
void getIndicesOfNonDefaultRowsImpl(Offsets & indices, size_t from, size_t limit) const;
/// Uses std::sort and partial_sort as default algorithms.
/// Uses sort and partial_sort as default algorithms.
/// Implements 'less' and 'equals' via comparator.
/// If 'less' and 'equals' can be implemented more optimal
/// (e.g. with less number of comparisons), you can use

View File

@ -11,6 +11,7 @@
#include <base/sort.h>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
@ -203,7 +204,7 @@ void IColumn::updatePermutationImpl(
limit, res, equal_ranges,
[&cmp](size_t lhs, size_t rhs) { return cmp(lhs, rhs) < 0; },
[&cmp](size_t lhs, size_t rhs) { return cmp(lhs, rhs) == 0; },
[](auto begin, auto end, auto pred) { std::sort(begin, end, pred); },
[](auto begin, auto end, auto pred) { ::sort(begin, end, pred); },
[](auto begin, auto mid, auto end, auto pred) { ::partial_sort(begin, mid, end, pred); });
}

View File

@ -20,6 +20,7 @@
#include <Common/Exception.h>
#include <Common/getResource.h>
#include <base/errnoToString.h>
#include <base/sort.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -105,7 +106,7 @@ static ElementIdentifier getElementIdentifier(Node * element)
std::string value = node->nodeValue();
attrs_kv.push_back(std::make_pair(name, value));
}
std::sort(attrs_kv.begin(), attrs_kv.end());
::sort(attrs_kv.begin(), attrs_kv.end());
ElementIdentifier res;
res.push_back(element->nodeName());
@ -443,7 +444,7 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
}
}
std::sort(files.begin(), files.end());
::sort(files.begin(), files.end());
return files;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/defines.h>
#include <base/sort.h>
#include <vector>
#include <utility>
@ -489,14 +490,14 @@ private:
}
}
std::sort(intervals_sorted_by_left_asc.begin(), intervals_sorted_by_left_asc.end(), [](auto & lhs, auto & rhs)
::sort(intervals_sorted_by_left_asc.begin(), intervals_sorted_by_left_asc.end(), [](auto & lhs, auto & rhs)
{
auto & lhs_interval = getInterval(lhs);
auto & rhs_interval = getInterval(rhs);
return lhs_interval.left < rhs_interval.left;
});
std::sort(intervals_sorted_by_right_desc.begin(), intervals_sorted_by_right_desc.end(), [](auto & lhs, auto & rhs)
::sort(intervals_sorted_by_right_desc.begin(), intervals_sorted_by_right_desc.end(), [](auto & lhs, auto & rhs)
{
auto & lhs_interval = getInterval(lhs);
auto & rhs_interval = getInterval(rhs);
@ -681,7 +682,7 @@ private:
size_t size = points.size();
size_t middle_element_index = size / 2;
std::nth_element(points.begin(), points.begin() + middle_element_index, points.end());
::nth_element(points.begin(), points.begin() + middle_element_index, points.end());
/** We should not get median as average of middle_element_index and middle_element_index - 1
* because we want point in node to intersect some interval.

View File

@ -7,6 +7,7 @@
#include <functional>
#include <base/types.h>
#include <base/scope_guard.h>
#include <base/sort.h>
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/NetException.h>
@ -178,7 +179,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, 0});
std::sort(
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)
{

View File

@ -5,6 +5,8 @@
#include <boost/range/adaptor/reversed.hpp>
#include <base/sort.h>
#include <Common/AllocatorWithMemoryTracking.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/HashTable/Hash.h>
@ -242,7 +244,7 @@ public:
}
}
std::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; });
::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; });
if (counter_list.size() > m_capacity)
{

View File

@ -12,6 +12,8 @@
//#include <iostream>
#include <filesystem>
#include <base/sort.h>
/**
ELF object can contain three different places with symbol names and addresses:
@ -498,8 +500,8 @@ void SymbolIndex::update()
{
dl_iterate_phdr(collectSymbols, &data);
std::sort(data.objects.begin(), data.objects.end(), [](const Object & a, const Object & b) { return a.address_begin < b.address_begin; });
std::sort(data.symbols.begin(), data.symbols.end(), [](const Symbol & a, const Symbol & b) { return a.address_begin < b.address_begin; });
::sort(data.objects.begin(), data.objects.end(), [](const Object & a, const Object & b) { return a.address_begin < b.address_begin; });
::sort(data.symbols.begin(), data.symbols.end(), [](const Symbol & a, const Symbol & b) { return a.address_begin < b.address_begin; });
/// We found symbols both from loaded program headers and from ELF symbol tables.
data.symbols.erase(std::unique(data.symbols.begin(), data.symbols.end(), [](const Symbol & a, const Symbol & b)

View File

@ -12,6 +12,7 @@
#include <Columns/ColumnSparse.h>
#include <iterator>
#include <base/sort.h>
namespace DB
@ -538,7 +539,7 @@ Block Block::sortColumns() const
for (auto it = index_by_name.begin(); it != index_by_name.end(); ++it)
sorted_index_by_name[i++] = it;
}
std::sort(sorted_index_by_name.begin(), sorted_index_by_name.end(), [](const auto & lhs, const auto & rhs)
::sort(sorted_index_by_name.begin(), sorted_index_by_name.end(), [](const auto & lhs, const auto & rhs)
{
return lhs->first < rhs->first;
});

View File

@ -1,4 +1,6 @@
#include <Core/NamesAndTypes.h>
#include <base/sort.h>
#include <Common/HashTable/HashMap.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBuffer.h>
@ -113,7 +115,7 @@ bool NamesAndTypesList::isSubsetOf(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
@ -121,16 +123,16 @@ size_t NamesAndTypesList::sizeOfDifference(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
void NamesAndTypesList::getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const
{
NamesAndTypes lhs_vector(begin(), end());
std::sort(lhs_vector.begin(), lhs_vector.end());
::sort(lhs_vector.begin(), lhs_vector.end());
NamesAndTypes rhs_vector(rhs.begin(), rhs.end());
std::sort(rhs_vector.begin(), rhs_vector.end());
::sort(rhs_vector.begin(), rhs_vector.end());
std::set_difference(lhs_vector.begin(), lhs_vector.end(), rhs_vector.begin(), rhs_vector.end(),
std::back_inserter(deleted));

View File

@ -1,5 +1,7 @@
#include <DataTypes/EnumValues.h>
#include <boost/algorithm/string.hpp>
#include <base/sort.h>
namespace DB
{
@ -18,7 +20,7 @@ EnumValues<T>::EnumValues(const Values & values_)
if (values.empty())
throw Exception{"DataTypeEnum enumeration cannot be empty", ErrorCodes::EMPTY_DATA_PASSED};
std::sort(std::begin(values), std::end(values), [] (auto & left, auto & right)
::sort(std::begin(values), std::end(values), [] (auto & left, auto & right)
{
return left.second < right.second;
});

View File

@ -12,6 +12,7 @@
#include <base/logger_useful.h>
#include <base/scope_guard_safe.h>
#include <base/sort.h>
#include <iomanip>
#include <filesystem>
@ -151,7 +152,7 @@ DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const Filt
if (!filter_by_table_name || filter_by_table_name(table_name))
filtered_tables.push_back(table_name);
}
std::sort(filtered_tables.begin(), filtered_tables.end());
::sort(filtered_tables.begin(), filtered_tables.end());
return std::make_unique<DatabaseLazyIterator>(*this, std::move(filtered_tables));
}

View File

@ -142,7 +142,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", database_name);
Int32 cversion = stat.cversion;
std::sort(hosts.begin(), hosts.end());
::sort(hosts.begin(), hosts.end());
std::vector<zkutil::ZooKeeper::FutureGet> futures;
futures.reserve(hosts.size());

View File

@ -13,10 +13,12 @@
#include <base/itoa.h>
#include <base/map.h>
#include <base/range.h>
#include <base/sort.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionHelpers.h>
namespace DB
{
namespace ErrorCodes
@ -145,7 +147,7 @@ static void validateKeyTypes(const DataTypes & key_types)
template <typename T, typename Comp>
size_t sortAndUnique(std::vector<T> & vec, Comp comp)
{
std::sort(vec.begin(), vec.end(),
::sort(vec.begin(), vec.end(),
[&](const auto & a, const auto & b) { return comp(a, b) < 0; });
auto new_end = std::unique(vec.begin(), vec.end(),

View File

@ -3,6 +3,8 @@
#include <numeric>
#include <cmath>
#include <base/sort.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeArray.h>
@ -250,7 +252,7 @@ void IPolygonDictionary::loadData()
polygon_ids.emplace_back(polygon, i);
}
std::sort(polygon_ids.begin(), polygon_ids.end(), [& areas](const auto & lhs, const auto & rhs)
::sort(polygon_ids.begin(), polygon_ids.end(), [& areas](const auto & lhs, const auto & rhs)
{
return areas[lhs.second] < areas[rhs.second];
});

View File

@ -3,11 +3,13 @@
#include <Common/ThreadPool.h>
#include <base/logger_useful.h>
#include <base/sort.h>
#include <algorithm>
#include <thread>
#include <numeric>
namespace DB
{
@ -87,7 +89,7 @@ std::vector<Coord> SlabsPolygonIndex::uniqueX(const std::vector<Polygon> & polyg
}
/** Making all_x sorted and distinct */
std::sort(all_x.begin(), all_x.end());
::sort(all_x.begin(), all_x.end());
all_x.erase(std::unique(all_x.begin(), all_x.end()), all_x.end());
return all_x;
@ -104,7 +106,7 @@ void SlabsPolygonIndex::indexBuild(const std::vector<Polygon> & polygons)
}
/** Sorting edges of (left_point, right_point, polygon_id) in that order */
std::sort(all_edges.begin(), all_edges.end(), Edge::compareByLeftPoint);
::sort(all_edges.begin(), all_edges.end(), Edge::compareByLeftPoint);
for (size_t i = 0; i != all_edges.size(); ++i)
all_edges[i].edge_id = i;
@ -298,7 +300,7 @@ bool SlabsPolygonIndex::find(const Point & point, size_t & id) const
} while (pos != 0);
/** Sort all ids and find smallest with odd occurrences */
std::sort(intersections.begin(), intersections.end());
::sort(intersections.begin(), intersections.end());
for (size_t i = 0; i < intersections.size(); i += 2)
{
if (i + 1 == intersections.size() || intersections[i] != intersections[i + 1])

View File

@ -12,6 +12,7 @@
#include <absl/container/flat_hash_set.h>
#include <base/unaligned.h>
#include <base/sort.h>
#include <Common/randomSeed.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
@ -24,6 +25,7 @@
#include <Dictionaries/ICacheDictionaryStorage.h>
#include <Dictionaries/DictionaryHelpers.h>
namespace CurrentMetrics
{
extern const Metric Write;
@ -1092,7 +1094,7 @@ private:
}
/// Sort blocks by offset before start async io requests
std::sort(blocks_to_request.begin(), blocks_to_request.end());
::sort(blocks_to_request.begin(), blocks_to_request.end());
file_buffer.fetchBlocks(configuration.read_buffer_blocks_size, blocks_to_request, [&](size_t block_index, char * block_data)
{

View File

@ -11,6 +11,14 @@
#include <fstream>
#include <unistd.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskMemory.h>
#include <Disks/DiskRestartProxy.h>
#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <base/logger_useful.h>
namespace CurrentMetrics
{
@ -25,7 +33,7 @@ namespace ErrorCodes
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_TRUNCATE_FILE;
extern const int CANNOT_UNLINK;
extern const int CANNOT_RMDIR;
@ -61,9 +69,6 @@ static void loadDiskLocalConfig(const String & name,
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
@ -113,13 +118,48 @@ public:
UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override;
DiskPtr getDisk(size_t i) const override
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation. It's a bug", ErrorCodes::LOGICAL_ERROR);
return disk;
}
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override;
void update(UInt64 new_size) override
{
std::lock_guard lock(DiskLocal::reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
~DiskLocalReservation() override;
~DiskLocalReservation() override
{
try
{
std::lock_guard lock(DiskLocal::reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
private:
DiskLocalPtr disk;
@ -188,7 +228,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
return false;
}
UInt64 DiskLocal::getTotalSpace() const
static UInt64 getTotalSpaceByName(const String & name, const String & disk_path, UInt64 keep_free_space_bytes)
{
struct statvfs fs;
if (name == "default") /// for default disk we get space from path/data/
@ -201,8 +241,17 @@ UInt64 DiskLocal::getTotalSpace() const
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getTotalSpace() const
{
if (broken || readonly)
return 0;
return getTotalSpaceByName(name, disk_path, keep_free_space_bytes);
}
UInt64 DiskLocal::getAvailableSpace() const
{
if (broken || readonly)
return 0;
/// we use f_bavail, because part of b_free space is
/// available for superuser only and for system purposes
struct statvfs fs;
@ -268,7 +317,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{
fs::path meta_path = fs::path(disk_path) / path;
if (fs::exists(meta_path) && fs::is_directory(meta_path))
if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path))
return std::make_unique<DiskLocalDirectoryIterator>(disk_path, path);
else
return std::make_unique<DiskLocalDirectoryIterator>();
@ -409,49 +458,191 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
keep_free_space_bytes = new_keep_free_space_bytes;
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
void DiskLocalReservation::update(UInt64 new_size)
DiskLocal::DiskLocal(
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms)
: DiskLocal(name_, path_, keep_free_space_bytes_)
{
std::lock_guard lock(DiskLocal::reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
if (local_disk_check_period_ms > 0)
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
DiskLocalReservation::~DiskLocalReservation()
void DiskLocal::startup()
{
try
{
std::lock_guard lock(DiskLocal::reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
broken = false;
disk_checker_magic_number = -1;
disk_checker_can_check_read = true;
readonly = !setup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
}
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
}
void DiskLocal::shutdown()
{
if (disk_checker)
disk_checker->shutdown();
}
std::optional<UInt32> DiskLocal::readDiskCheckerMagicNumber() const noexcept
try
{
ReadSettings read_settings;
/// Proper disk read checking requires direct io
read_settings.direct_io_threshold = 1;
auto buf = readFile(disk_checker_path, read_settings, {}, {});
UInt32 magic_number;
readIntBinary(magic_number, *buf);
if (buf->eof())
return magic_number;
LOG_WARNING(logger, "The size of disk check magic number is more than 4 bytes. Mark it as read failure");
return {};
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Cannot read correct disk check magic number from from {}{}", disk_path, disk_checker_path));
return {};
}
bool DiskLocal::canRead() const noexcept
try
{
if (FS::canRead(fs::path(disk_path) / disk_checker_path))
{
auto magic_number = readDiskCheckerMagicNumber();
if (magic_number && *magic_number == disk_checker_magic_number)
return true;
}
return false;
}
catch (...)
{
LOG_WARNING(logger, "Cannot achieve read over the disk directory: {}", disk_path);
return false;
}
struct DiskWriteCheckData
{
constexpr static size_t PAGE_SIZE = 4096;
char data[PAGE_SIZE]{};
DiskWriteCheckData()
{
static const char * magic_string = "ClickHouse disk local write check";
static size_t magic_string_len = strlen(magic_string);
memcpy(data, magic_string, magic_string_len);
memcpy(data + PAGE_SIZE - magic_string_len, magic_string, magic_string_len);
}
};
bool DiskLocal::canWrite() const noexcept
try
{
static DiskWriteCheckData data;
String tmp_template = fs::path(disk_path) / "";
{
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
buf->write(data.data, data.PAGE_SIZE);
buf->sync();
}
return true;
}
catch (...)
{
LOG_WARNING(logger, "Cannot achieve write over the disk directory: {}", disk_path);
return false;
}
bool DiskLocal::setup()
{
try
{
fs::create_directories(disk_path);
}
catch (...)
{
LOG_ERROR(logger, "Cannot create the directory of disk {} ({}).", name, disk_path);
throw;
}
try
{
if (!FS::canRead(disk_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "There is no read access to disk {} ({}).", name, disk_path);
}
catch (...)
{
LOG_ERROR(logger, "Cannot gain read access of the disk directory: {}", disk_path);
throw;
}
/// If disk checker is disabled, just assume RW by default.
if (!disk_checker)
return true;
try
{
if (exists(disk_checker_path))
{
auto magic_number = readDiskCheckerMagicNumber();
if (magic_number)
disk_checker_magic_number = *magic_number;
else
{
/// The checker file is incorrect. Mark the magic number to uninitialized and try to generate a new checker file.
disk_checker_magic_number = -1;
}
}
}
catch (...)
{
LOG_ERROR(logger, "We cannot tell if {} exists anymore, or read from it. Most likely disk {} is broken", disk_checker_path, name);
throw;
}
/// Try to create a new checker file. The disk status can be either broken or readonly.
if (disk_checker_magic_number == -1)
try
{
pcg32_fast rng(randomSeed());
UInt32 magic_number = rng();
{
auto buf = writeFile(disk_checker_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
writeIntBinary(magic_number, *buf);
}
disk_checker_magic_number = magic_number;
}
catch (...)
{
LOG_WARNING(
logger,
"Cannot create/write to {0}. Disk {1} is either readonly or broken. Without setting up disk checker file, DiskLocalCheckThread "
"will not be started. Disk is assumed to be RW. Try manually fix the disk and do `SYSTEM RESTART DISK {1}`",
disk_checker_path,
name);
disk_checker_can_check_read = false;
return true;
}
if (disk_checker_magic_number == -1)
throw Exception("disk_checker_magic_number is not initialized. It's a bug", ErrorCodes::LOGICAL_ERROR);
return true;
}
void registerDiskLocal(DiskFactory & factory)
{
@ -459,17 +650,20 @@ void registerDiskLocal(DiskFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr {
const DisksMap & map) -> DiskPtr
{
String path;
UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes);
for (const auto & [disk_name, disk_ptr] : map)
{
if (path == disk_ptr->getPath())
throw Exception("Disk " + name + " and Disk " + disk_name + " cannot have the same path" + " (" + path + ")", ErrorCodes::BAD_ARGUMENTS);
}
return std::make_shared<DiskLocal>(name, path, keep_free_space_bytes);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path);
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup();
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("local", creator);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/logger_useful.h>
#include <Disks/DiskLocalCheckThread.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
@ -10,24 +11,22 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class DiskLocalReservation;
class DiskLocal : public IDisk
{
public:
friend class DiskLocalCheckThread;
friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_), disk_path(path_), keep_free_space_bytes(keep_free_space_bytes_)
{
if (disk_path.back() != '/')
throw Exception("Disk path must end with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
}
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_);
DiskLocal(
const String & name_,
const String & path_,
UInt64 keep_free_space_bytes_,
ContextPtr context,
UInt64 local_disk_check_period_ms);
const String & getName() const override { return name; }
@ -106,13 +105,33 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
bool isBroken() const override { return broken; }
void startup() override;
void shutdown() override;
/// Check if the disk is OK to proceed read/write operations. Currently the check is
/// rudimentary. The more advanced choice would be using
/// https://github.com/smartmontools/smartmontools. However, it's good enough for now.
bool canRead() const noexcept;
bool canWrite() const noexcept;
private:
bool tryReserve(UInt64 bytes);
private:
/// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
/// Throw exception if it's not possible to setup necessary files and directories.
bool setup();
/// Read magic number from disk checker file. Return std::nullopt if exception happens.
std::optional<UInt32> readDiskCheckerMagicNumber() const noexcept;
const String name;
const String disk_path;
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
@ -120,6 +139,14 @@ private:
static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
std::atomic<bool> broken{false};
std::atomic<bool> readonly{false};
std::unique_ptr<DiskLocalCheckThread> disk_checker;
/// A magic number to vaguely check if reading operation generates correct result.
/// -1 means there is no available disk_checker_file yet.
Int64 disk_checker_magic_number = -1;
bool disk_checker_can_check_read = true;
};

View File

@ -0,0 +1,70 @@
#include <Disks/DiskLocalCheckThread.h>
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
#include <base/logger_useful.h>
namespace DB
{
static const auto DISK_CHECK_ERROR_SLEEP_MS = 1000;
static const auto DISK_CHECK_ERROR_RETRY_TIME = 3;
DiskLocalCheckThread::DiskLocalCheckThread(DiskLocal * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms)
: WithContext(context_)
, disk(std::move(disk_))
, check_period_ms(local_disk_check_period_ms)
, log(&Poco::Logger::get(fmt::format("DiskLocalCheckThread({})", disk->getName())))
{
task = getContext()->getSchedulePool().createTask(log->name(), [this] { run(); });
}
void DiskLocalCheckThread::startup()
{
need_stop = false;
retry = 0;
task->activateAndSchedule();
}
void DiskLocalCheckThread::run()
{
if (need_stop)
return;
bool can_read = disk->canRead();
bool can_write = disk->canWrite();
if (can_read)
{
if (disk->broken)
LOG_INFO(log, "Disk {0} seems to be fine. It can be recovered using `SYSTEM RESTART DISK {0}`", disk->getName());
retry = 0;
if (can_write)
disk->readonly = false;
else
{
disk->readonly = true;
LOG_INFO(log, "Disk {} is readonly", disk->getName());
}
task->scheduleAfter(check_period_ms);
}
else if (!disk->broken && retry < DISK_CHECK_ERROR_RETRY_TIME)
{
++retry;
task->scheduleAfter(DISK_CHECK_ERROR_SLEEP_MS);
}
else
{
retry = 0;
disk->broken = true;
LOG_INFO(log, "Disk {} is broken", disk->getName());
task->scheduleAfter(check_period_ms);
}
}
void DiskLocalCheckThread::shutdown()
{
need_stop = true;
task->deactivate();
LOG_TRACE(log, "DiskLocalCheck thread finished");
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context_fwd.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class DiskLocal;
class DiskLocalCheckThread : WithContext
{
public:
friend class DiskLocal;
DiskLocalCheckThread(DiskLocal * disk_, ContextPtr context_, UInt64 local_disk_check_period_ms);
void startup();
void shutdown();
private:
bool check();
void run();
DiskLocal * disk;
size_t check_period_ms;
Poco::Logger * log;
std::atomic<bool> need_stop{false};
BackgroundSchedulePool::TaskHolder task;
size_t retry{};
};
}

View File

@ -40,7 +40,12 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
}
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), 0));
{
disks.emplace(
default_disk_name,
std::make_shared<DiskLocal>(
default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0)));
}
}

View File

@ -37,6 +37,12 @@ public:
disks.emplace(name, disk);
}
void shutdown()
{
for (auto & e : disks)
e.second->shutdown();
}
private:
DisksMap disks;
};

View File

@ -224,6 +224,9 @@ public:
virtual bool isReadOnly() const { return false; }
/// Check if disk is broken. Broken disks will have 0 space and not be used.
virtual bool isBroken() const { return false; }
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}

View File

@ -60,6 +60,7 @@ public:
DiskPtr getDisk() const { return getDisk(0); }
virtual DiskPtr getDisk(size_t i) const { return disks[i]; }
Disks & getDisks() { return disks; }
const Disks & getDisks() const { return disks; }
/// Returns effective value of whether merges are allowed on this volume (true) or not (false).

View File

@ -164,10 +164,18 @@ DiskPtr StoragePolicy::getAnyDisk() const
if (volumes.empty())
throw Exception("Storage policy " + backQuote(name) + " has no volumes. It's a bug.", ErrorCodes::LOGICAL_ERROR);
if (volumes[0]->getDisks().empty())
throw Exception("Volume " + backQuote(name) + "." + backQuote(volumes[0]->getName()) + " has no disks. It's a bug.", ErrorCodes::LOGICAL_ERROR);
for (const auto & volume : volumes)
{
if (volume->getDisks().empty())
throw Exception("Volume '" + volume->getName() + "' has no disks. It's a bug", ErrorCodes::LOGICAL_ERROR);
for (const auto & disk : volume->getDisks())
{
if (!disk->isBroken())
return disk;
}
}
return volumes[0]->getDisks()[0];
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "All disks in storage policy {} are broken", name);
}
@ -233,6 +241,10 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
}
}
}
if (!max_disk)
throw Exception(
"There is no space on any disk in storage policy: " + name + ". It's likely all disks are broken",
ErrorCodes::NOT_ENOUGH_SPACE);
auto reservation = max_disk->reserve(0);
if (!reservation)
{

View File

@ -36,6 +36,7 @@
# include <IO/WriteBufferFromString.h>
# include <IO/WriteHelpers.h>
# include <base/range.h>
# include <base/sort.h>
# include <google/protobuf/descriptor.h>
# include <google/protobuf/descriptor.pb.h>
# include <boost/algorithm/string.hpp>
@ -2163,7 +2164,7 @@ namespace
for (auto & desc : field_descs_)
field_infos.emplace_back(std::move(desc.column_indices), *desc.field_descriptor, std::move(desc.field_serializer));
std::sort(field_infos.begin(), field_infos.end(),
::sort(field_infos.begin(), field_infos.end(),
[](const FieldInfo & lhs, const FieldInfo & rhs) { return lhs.field_tag < rhs.field_tag; });
for (size_t i : collections::range(field_infos.size()))
@ -2643,7 +2644,7 @@ namespace
missing_column_indices.clear();
missing_column_indices.reserve(column_names.size() - used_column_indices.size());
auto used_column_indices_sorted = std::move(used_column_indices);
std::sort(used_column_indices_sorted.begin(), used_column_indices_sorted.end());
::sort(used_column_indices_sorted.begin(), used_column_indices_sorted.end());
boost::range::set_difference(collections::range(column_names.size()), used_column_indices_sorted,
std::back_inserter(missing_column_indices));
@ -2755,7 +2756,7 @@ namespace
}
/// Shorter suffixes first.
std::sort(out_field_descriptors_with_suffixes.begin(), out_field_descriptors_with_suffixes.end(),
::sort(out_field_descriptors_with_suffixes.begin(), out_field_descriptors_with_suffixes.end(),
[](const std::pair<const FieldDescriptor *, std::string_view /* suffix */> & f1,
const std::pair<const FieldDescriptor *, std::string_view /* suffix */> & f2)
{

View File

@ -17,6 +17,7 @@
#include <type_traits>
#include <array>
#include <base/bit_cast.h>
#include <base/sort.h>
#include <algorithm>
#ifdef __SSE4_1__
@ -738,7 +739,7 @@ private:
for (size_t i = 0; i < boundaries.size(); ++i)
boundary_values[i] = boundaries[i].get<ValueType>();
std::sort(boundary_values.begin(), boundary_values.end());
::sort(boundary_values.begin(), boundary_values.end());
boundary_values.erase(std::unique(boundary_values.begin(), boundary_values.end()), boundary_values.end());
size_t size = src.size();

View File

@ -1,5 +1,6 @@
#include <algorithm>
#include <vector>
#include <base/sort.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include "arrayScalarProduct.h"
@ -112,7 +113,7 @@ public:
sorted_labels[i].label = label;
}
std::sort(sorted_labels.begin(), sorted_labels.end(), [](const auto & lhs, const auto & rhs) { return lhs.score > rhs.score; });
::sort(sorted_labels.begin(), sorted_labels.end(), [](const auto & lhs, const auto & rhs) { return lhs.score > rhs.score; });
/// We will first calculate non-normalized area.

View File

@ -1,4 +1,5 @@
#include "FunctionArrayMapped.h"
#include <base/sort.h>
#include <Functions/FunctionFactory.h>
@ -49,7 +50,7 @@ struct ArraySortImpl
for (size_t i = 0; i < size; ++i)
{
auto next_offset = offsets[i];
std::sort(&permutation[current_offset], &permutation[next_offset], Less(*mapped));
::sort(&permutation[current_offset], &permutation[next_offset], Less(*mapped));
current_offset = next_offset;
}

View File

@ -1,3 +1,4 @@
#include <base/sort.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
@ -53,7 +54,7 @@ namespace
}
/// We sort the names because the result of the function should not depend on the order of UUIDs.
std::sort(role_names.begin(), role_names.end());
::sort(role_names.begin(), role_names.end());
}
size_t getNumberOfArguments() const override { return 0; }

View File

@ -537,7 +537,7 @@ class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr>
using Parent = UpdatableSessionBase<HTTPSessionPtr>;
public:
explicit UpdatableSession(
UpdatableSession(
const Poco::URI uri,
const ConnectionTimeouts & timeouts_,
const UInt64 max_redirects_)
@ -557,7 +557,7 @@ class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
public:
explicit ReadWriteBufferFromHTTP(
ReadWriteBufferFromHTTP(
Poco::URI uri_,
const std::string & method_,
OutStreamCallback out_stream_callback_,

View File

@ -13,6 +13,7 @@
#include <IO/Operators.h>
#include <stack>
#include <base/sort.h>
#include <Common/JSONBuilder.h>
namespace DB
@ -676,7 +677,7 @@ NameSet ActionsDAG::foldActionsByProjection(
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
{
std::sort(index.begin(), index.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
::sort(index.begin(), index.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
{
return key_names_pos_map.find(lhs->result_name)->second < key_names_pos_map.find(rhs->result_name)->second;
});

View File

@ -1,6 +1,7 @@
#include <future>
#include <Poco/Util/Application.h>
#include <base/sort.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/formatReadable.h>
@ -2167,7 +2168,7 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
if (non_empty_data.size() > 1)
{
/// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first).
std::sort(non_empty_data.begin(), non_empty_data.end(),
::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();

View File

@ -12,8 +12,10 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <base/range.h>
#include <base/sort.h>
#include <boost/range/algorithm_ext/erase.hpp>
namespace DB
{
@ -305,11 +307,11 @@ void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & new_conf
Poco::Util::AbstractConfiguration::Keys deleted_keys;
if (old_config)
{
std::sort(new_config_keys.begin(), new_config_keys.end());
::sort(new_config_keys.begin(), new_config_keys.end());
Poco::Util::AbstractConfiguration::Keys old_config_keys;
old_config->keys(config_prefix, old_config_keys);
std::sort(old_config_keys.begin(), old_config_keys.end());
::sort(old_config_keys.begin(), old_config_keys.end());
std::set_difference(
old_config_keys.begin(), old_config_keys.end(), new_config_keys.begin(), new_config_keys.end(), std::back_inserter(deleted_keys));

View File

@ -2650,6 +2650,19 @@ void Context::shutdown()
}
}
// Special volumes might also use disks that require shutdown.
for (const auto & volume : {shared->tmp_volume, shared->backups_volume})
{
if (volume)
{
auto & disks = volume->getDisks();
for (auto & disk : disks)
{
disk->shutdown();
}
}
}
shared->shutdown();
}

View File

@ -1,4 +1,5 @@
#include <Interpreters/DDLTask.h>
#include <base/sort.h>
#include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h>
#include <IO/WriteHelpers.h>
@ -14,6 +15,7 @@
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Databases/DatabaseReplicated.h>
namespace DB
{
@ -324,7 +326,7 @@ String DDLTask::getShardID() const
Strings replica_names;
for (const Cluster::Address & address : shard_addresses)
replica_names.emplace_back(address.readableString());
std::sort(replica_names.begin(), replica_names.end());
::sort(replica_names.begin(), replica_names.end());
String res;
for (auto it = replica_names.begin(); it != replica_names.end(); ++it)

View File

@ -28,6 +28,7 @@
#include <base/sleep.h>
#include <base/getFQDNOrHostName.h>
#include <base/logger_useful.h>
#include <base/sort.h>
#include <random>
#include <pcg_random.hpp>
#include <base/scope_guard_safe.h>
@ -221,7 +222,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
static void filterAndSortQueueNodes(Strings & all_nodes)
{
all_nodes.erase(std::remove_if(all_nodes.begin(), all_nodes.end(), [] (const String & s) { return !startsWith(s, "query-"); }), all_nodes.end());
std::sort(all_nodes.begin(), all_nodes.end());
::sort(all_nodes.begin(), all_nodes.end());
}
void DDLWorker::scheduleTasks(bool reinitialized)

View File

@ -16,6 +16,7 @@
#include <Columns/ColumnSet.h>
#include <queue>
#include <stack>
#include <base/sort.h>
#include <Common/JSONBuilder.h>
#include <Core/SettingsEnums.h>
@ -735,7 +736,7 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run)
}
else
{
std::sort(execution_context.inputs_pos.rbegin(), execution_context.inputs_pos.rend());
::sort(execution_context.inputs_pos.rbegin(), execution_context.inputs_pos.rend());
for (auto input : execution_context.inputs_pos)
if (input >= 0)
block.erase(input);

View File

@ -6,6 +6,7 @@
#include <stack>
#include <base/logger_useful.h>
#include <base/sort.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
@ -576,7 +577,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression, const
/** Sort nodes before compilation using their children size to avoid compiling subexpression before compile parent expression.
* This is needed to avoid compiling expression more than once with different names because of compilation order.
*/
std::sort(nodes_to_compile.begin(), nodes_to_compile.end(), [&](const Node * lhs, const Node * rhs) { return node_to_data[lhs].children_size > node_to_data[rhs].children_size; });
::sort(nodes_to_compile.begin(), nodes_to_compile.end(), [&](const Node * lhs, const Node * rhs)
{
return node_to_data[lhs].children_size > node_to_data[rhs].children_size;
});
for (auto & node : nodes_to_compile)
{

View File

@ -72,6 +72,7 @@
#include <Core/Field.h>
#include <Core/ProtocolDefines.h>
#include <base/types.h>
#include <base/sort.h>
#include <Columns/Collator.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/FieldVisitorToString.h>
@ -2269,7 +2270,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
for (const auto & [_, w] : query_analyzer->windowDescriptions())
windows_sorted.push_back(&w);
std::sort(windows_sorted.begin(), windows_sorted.end(), windowDescriptionComparator);
::sort(windows_sorted.begin(), windows_sorted.end(), windowDescriptionComparator);
const Settings & settings = context->getSettingsRef();
for (size_t i = 0; i < windows_sorted.size(); ++i)

View File

@ -9,6 +9,8 @@
#include <deque>
#include <base/sort.h>
namespace DB
{
@ -180,7 +182,7 @@ void LogicalExpressionsOptimizer::collectDisjunctiveEqualityChains()
{
auto & equalities = chain.second;
auto & equality_functions = equalities.functions;
std::sort(equality_functions.begin(), equality_functions.end());
::sort(equality_functions.begin(), equality_functions.end());
}
}
@ -237,7 +239,7 @@ void LogicalExpressionsOptimizer::addInExpression(const DisjunctiveEqualityChain
}
/// Sort the literals so that they are specified in the same order in the IN expression.
std::sort(tuple.begin(), tuple.end());
::sort(tuple.begin(), tuple.end());
/// Get the expression `expr` from the chain `expr = x1 OR ... OR expr = xN`
ASTPtr equals_expr_lhs;

View File

@ -1,15 +1,18 @@
#pragma once
#include <Common/Arena.h>
#include <Columns/IColumn.h>
#include <Interpreters/asof.h>
#include <optional>
#include <variant>
#include <list>
#include <mutex>
#include <algorithm>
#include <base/sort.h>
#include <Common/Arena.h>
#include <Columns/IColumn.h>
#include <Interpreters/asof.h>
namespace DB
{
@ -199,7 +202,7 @@ private:
if (!sorted.load(std::memory_order_relaxed))
{
if (!array.empty())
std::sort(array.begin(), array.end(), (ascending ? less : greater));
::sort(array.begin(), array.end(), (ascending ? less : greater));
sorted.store(true, std::memory_order_release);
}

View File

@ -25,8 +25,10 @@
#include <Storages/MergeTree/KeyCondition.h>
#include <base/range.h>
#include <base/sort.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -405,7 +407,7 @@ void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) c
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_)
: has_all_keys(set_elements.size() == indexes_mapping_.size()), indexes_mapping(std::move(indexes_mapping_))
{
std::sort(indexes_mapping.begin(), indexes_mapping.end(),
::sort(indexes_mapping.begin(), indexes_mapping.end(),
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
{
return std::tie(l.key_index, l.tuple_index) < std::tie(r.key_index, r.tuple_index);

View File

@ -18,6 +18,7 @@
#include <Processors/Sinks/EmptySink.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
#include <base/sort.h>
namespace fs = std::filesystem;
@ -124,7 +125,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
use_local_default_database = true;
}
}
std::sort(shard_default_databases.begin(), shard_default_databases.end());
::sort(shard_default_databases.begin(), shard_default_databases.end());
shard_default_databases.erase(std::unique(shard_default_databases.begin(), shard_default_databases.end()), shard_default_databases.end());
assert(use_local_default_database || !shard_default_databases.empty());

View File

@ -48,7 +48,7 @@ static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery>
parsed_table = parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
if (!parsed_table && require_table)
return false;
return false;
if (!parsed_on_cluster && ParserKeyword{"ON"}.ignore(pos, expected))
if (!ASTQueryWithOnCluster::parse(pos, cluster, expected))
@ -64,6 +64,20 @@ static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery>
return true;
}
static bool parseQueryWithOnCluster(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos,
Expected & expected)
{
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
return true;
}
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
if (!ParserKeyword{"SYSTEM"}.ignore(pos, expected))
@ -98,13 +112,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::RELOAD_MODEL:
{
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
parseQueryWithOnCluster(res, pos, expected);
ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected))
{
@ -127,13 +136,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::RELOAD_FUNCTION:
{
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
parseQueryWithOnCluster(res, pos, expected);
ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected))
{
@ -156,6 +160,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::DROP_REPLICA:
{
parseQueryWithOnCluster(res, pos, expected);
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
@ -196,12 +202,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
{
parseQueryWithOnCluster(res, pos, expected);
if (!parseDatabaseAndTableAsAST(pos, expected, res->database, res->table))
return false;
break;
}
case Type::RESTART_DISK:
{
parseQueryWithOnCluster(res, pos, expected);
ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected))
res->disk = ast->as<ASTIdentifier &>().name();
@ -235,7 +246,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
String storage_policy_str;
String volume_str;
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
auto parse_on_volume = [&]() -> bool
{
ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected))
@ -250,7 +261,25 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
volume_str = ast->as<ASTIdentifier &>().name();
else
return false;
return true;
};
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
{
if (!parse_on_volume())
return false;
}
else
{
parseQueryWithOnCluster(res, pos, expected);
if (ParserKeyword{"ON VOLUME"}.ignore(pos, expected))
{
if (!parse_on_volume())
return false;
}
}
res->storage_policy = storage_policy_str;
res->volume = volume_str;
if (res->volume.empty() && res->storage_policy.empty())
@ -268,11 +297,14 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
parseQueryWithOnCluster(res, pos, expected);
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
break;
case Type::SUSPEND:
{
parseQueryWithOnCluster(res, pos, expected);
ASTPtr seconds;
if (!(ParserKeyword{"FOR"}.ignore(pos, expected)
&& ParserUnsignedInteger().parse(pos, seconds, expected)
@ -286,8 +318,10 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
default:
/// There are no [db.table] after COMMAND NAME
{
parseQueryWithOnCluster(res, pos, expected);
break;
}
}
if (res->database)

View File

@ -1,5 +1,8 @@
#include <Processors/DelayedPortsProcessor.h>
#include <base/sort.h>
namespace DB
{
@ -18,7 +21,7 @@ InputPorts createInputPorts(
return InputPorts(num_ports, header);
InputPorts res;
std::sort(delayed_ports.begin(), delayed_ports.end());
::sort(delayed_ports.begin(), delayed_ports.end());
size_t next_delayed_port = 0;
for (size_t i = 0; i < num_ports; ++i)
{

View File

@ -28,6 +28,7 @@
#include <Processors/Formats/Impl/ConstantExpressionTemplate.h>
#include <Parsers/ExpressionElementParsers.h>
#include <boost/functional/hash.hpp>
#include <base/sort.h>
namespace DB
@ -299,7 +300,7 @@ ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo &
{
null_as_default = null_as_default_;
std::sort(replaced_literals.begin(), replaced_literals.end(), [](const LiteralInfo & a, const LiteralInfo & b)
::sort(replaced_literals.begin(), replaced_literals.end(), [](const LiteralInfo & a, const LiteralInfo & b)
{
return a.literal->begin.value() < b.literal->begin.value();
});

View File

@ -196,7 +196,19 @@ private:
size_t segmentator_ticket_number{0};
size_t reader_ticket_number{0};
/// Mutex for internal synchronization between threads
std::mutex mutex;
/// finishAndWait can be called concurrently from
/// multiple threads. Atomic flag is not enough
/// because if finishAndWait called before destructor it can check the flag
/// and destroy object immediately.
std::mutex finish_and_wait_mutex;
/// We don't use parsing_finished flag because it can be setup from multiple
/// place in code. For example in case of bad data. It doesn't mean that we
/// don't need to finishAndWait our class.
bool finish_and_wait_called = false;
std::condition_variable reader_condvar;
std::condition_variable segmentator_condvar;
@ -227,7 +239,7 @@ private:
struct ProcessingUnit
{
explicit ProcessingUnit()
ProcessingUnit()
: status(ProcessingUnitStatus::READY_TO_INSERT)
{
}
@ -263,10 +275,21 @@ private:
void finishAndWait()
{
/// Defending concurrent segmentator thread join
std::lock_guard finish_and_wait_lock(finish_and_wait_mutex);
/// We shouldn't execute this logic twice
if (finish_and_wait_called)
return;
finish_and_wait_called = true;
/// Signal background threads to finish
parsing_finished = true;
{
std::unique_lock<std::mutex> lock(mutex);
/// Additionally notify condvars
std::lock_guard<std::mutex> lock(mutex);
segmentator_condvar.notify_all();
reader_condvar.notify_all();
}

View File

@ -45,7 +45,7 @@ bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf)
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
bool match = RE2::FullMatchN(re2::StringPiece(buf.position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
bool match = re2_st::RE2::FullMatchN(re2_st::StringPiece(buf.position(), line_size), regexp, re2_arguments_ptrs.data(), re2_arguments_ptrs.size());
if (!match && !skip_unmatched)
throw Exception("Line \"" + std::string(buf.position(), line_size) + "\" doesn't match the regexp.", ErrorCodes::INCORRECT_DATA);

View File

@ -1,7 +1,7 @@
#pragma once
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <re2_st/re2.h>
#include <re2_st/stringpiece.h>
#include <string>
#include <vector>
#include <Core/Block.h>
@ -12,6 +12,7 @@
#include <IO/PeekableReadBuffer.h>
#include <Formats/ParsedTemplateFormatString.h>
namespace DB
{
@ -26,17 +27,17 @@ public:
/// Return true if row was successfully parsed and row fields were extracted.
bool parseRow(PeekableReadBuffer & buf);
re2::StringPiece getField(size_t index) { return matched_fields[index]; }
re2_st::StringPiece getField(size_t index) { return matched_fields[index]; }
size_t getMatchedFieldsSize() const { return matched_fields.size(); }
size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); }
private:
const RE2 regexp;
const re2_st::RE2 regexp;
// The vector of fields extracted from line using regexp.
std::vector<re2::StringPiece> matched_fields;
std::vector<re2_st::StringPiece> matched_fields;
// These two vectors are needed to use RE2::FullMatchN (function for extracting fields).
std::vector<RE2::Arg> re2_arguments;
std::vector<RE2::Arg *> re2_arguments_ptrs;
std::vector<re2_st::RE2::Arg> re2_arguments;
std::vector<re2_st::RE2::Arg *> re2_arguments_ptrs;
bool skip_unmatched;
};

View File

@ -10,9 +10,11 @@
#include <unordered_map>
#include <fmt/format.h>
#include <base/sort.h>
#include <Poco/Util/AbstractConfiguration.h>
using namespace std::literals;
namespace DB::ErrorCodes
@ -286,7 +288,7 @@ std::string buildTaggedRegex(std::string regexp_str)
else
regexp_str = "[\\?&]";
std::sort(std::begin(tags), std::end(tags)); /* sorted tag keys */
::sort(std::begin(tags), std::end(tags)); /* sorted tag keys */
regexp_str += fmt::format(
"{}{}",
fmt::join(tags, "&(.*&)?"),
@ -419,7 +421,7 @@ appendGraphitePattern(
/// retention should be in descending order of age.
if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll
std::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
patterns.emplace_back(pattern);
return patterns.back();

View File

@ -26,6 +26,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <base/logger_useful.h>
#include <base/sort.h>
#include <Common/JSONBuilder.h>
namespace ProfileEvents
@ -1014,7 +1015,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = result.sampling.filter_expression->getRequiredColumns().getNames();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
column_names_to_read.end());
}
@ -1038,7 +1039,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
if (!data.merging_params.version_column.empty())
column_names_to_read.push_back(data.merging_params.version_column);
std::sort(column_names_to_read.begin(), column_names_to_read.end());
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
pipe = spreadMarkRangesAmongStreamsFinal(

View File

@ -64,6 +64,7 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int NO_DATA_TO_INSERT;
extern const int SUPPORT_IS_DISABLED;
extern const int BAD_REQUEST_PARAMETER;
}
namespace
@ -295,6 +296,8 @@ namespace
setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level()));
}
grpc::ServerContext grpc_context;
protected:
CompletionCallback * getCallbackPtr(const CompletionCallback & callback)
{
@ -317,8 +320,6 @@ namespace
return &callback_in_map;
}
grpc::ServerContext grpc_context;
private:
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> reader_writer{&grpc_context};
std::unordered_map<size_t, CompletionCallback> callbacks;
@ -751,6 +752,35 @@ namespace
session->authenticate(user, password, user_address);
session->getClientInfo().quota_key = quota_key;
// Parse the OpenTelemetry traceparent header.
ClientInfo client_info = session->getClientInfo();
auto & client_metadata = responder->grpc_context.client_metadata();
auto traceparent = client_metadata.find("traceparent");
if (traceparent != client_metadata.end())
{
grpc::string_ref parent_ref = traceparent->second;
std::string opentelemetry_traceparent(parent_ref.data(), parent_ref.length());
std::string error;
if (!client_info.client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
"Failed to parse OpenTelemetry traceparent header '{}': {}",
opentelemetry_traceparent, error);
}
auto tracestate = client_metadata.find("tracestate");
if (tracestate != client_metadata.end())
{
grpc::string_ref state_ref = tracestate->second;
client_info.client_trace_context.tracestate =
std::string(state_ref.data(), state_ref.length());
}
else
{
client_info.client_trace_context.tracestate = "";
}
}
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
if (!query_info.session_id().empty())
@ -759,7 +789,7 @@ namespace
query_info.session_id(), getSessionTimeout(query_info, iserver.config()), query_info.session_check());
}
query_context = session->makeQueryContext();
query_context = session->makeQueryContext(std::move(client_info));
/// Prepare settings.
SettingsChanges settings_changes;

View File

@ -319,6 +319,7 @@ HDFSSource::HDFSSource(
void HDFSSource::onCancel()
{
std::lock_guard lock(reader_mutex);
if (reader)
reader->cancel();
}
@ -392,12 +393,15 @@ Chunk HDFSSource::generate()
return Chunk(std::move(columns), num_rows);
}
reader.reset();
pipeline.reset();
read_buf.reset();
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
return {};
if (!initialize())
return {};
}
return generate();
}

View File

@ -150,6 +150,8 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently
std::mutex reader_mutex;
String current_path;
/// Recreate ReadBuffer and PullingPipelineExecutor for each file.

View File

@ -19,6 +19,8 @@
#include <boost/algorithm/string/join.hpp>
#include <iterator>
#include <regex>
#include <base/sort.h>
namespace fs = std::filesystem;
@ -425,7 +427,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
}
if (!capability.empty())
{
std::sort(capability.begin(), capability.end());
::sort(capability.begin(), capability.end());
capability.erase(std::unique(capability.begin(), capability.end()), capability.end());
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);

View File

@ -1118,6 +1118,7 @@ UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const DiskPtr & disk_, const
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
try
{
assertOnDisk();
@ -1154,6 +1155,18 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
storage.lockSharedData(*this);
}
catch (...)
{
if (startsWith(new_relative_path, "detached/"))
{
// Don't throw when the destination is to the detached folder. It might be able to
// recover in some cases, such as fetching parts into multi-disks while some of the
// disks are broken.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
else
throw;
}
void IMergeTreeDataPart::cleanupOldName(const String & old_part_name) const
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/logger_useful.h>
#include <base/sort.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/BackgroundSchedulePool.h>
@ -48,7 +49,7 @@ void checkNoOldLeaders(Poco::Logger * log, ZooKeeper & zookeeper, const String p
}
else
{
std::sort(potential_leaders.begin(), potential_leaders.end());
::sort(potential_leaders.begin(), potential_leaders.end());
if (potential_leaders.front() == persistent_multiple_leaders)
return;

View File

@ -70,6 +70,7 @@
#include <boost/algorithm/string/replace.hpp>
#include <base/insertAtEnd.h>
#include <base/sort.h>
#include <algorithm>
#include <iomanip>
@ -266,9 +267,13 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist.
for (const auto & disk : getDisks())
{
if (disk->isBroken())
continue;
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
if (disk->exists(current_version_file_path))
{
if (!version_file.first.empty())
@ -1197,6 +1202,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isBroken())
continue;
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
{
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
@ -1217,6 +1225,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::mutex wal_init_lock;
for (const auto & disk_ptr : disks)
{
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
@ -1289,6 +1300,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!parts_from_wal.empty())
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
for (auto & part : duplicate_parts_to_remove)
part->remove();
for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
@ -1383,6 +1397,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
/// Delete temporary directories older than a day.
for (const auto & disk : getDisks())
{
if (disk->isBroken())
continue;
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
@ -1625,7 +1642,7 @@ size_t MergeTreeData::clearOldWriteAheadLogs()
if (all_block_numbers_on_disk.empty())
return 0;
std::sort(all_block_numbers_on_disk.begin(), all_block_numbers_on_disk.end());
::sort(all_block_numbers_on_disk.begin(), all_block_numbers_on_disk.end());
block_numbers_on_disk.push_back(all_block_numbers_on_disk[0]);
for (size_t i = 1; i < all_block_numbers_on_disk.size(); ++i)
{
@ -1656,6 +1673,8 @@ size_t MergeTreeData::clearOldWriteAheadLogs()
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
if (disk_ptr->isBroken())
continue;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name());
@ -1738,6 +1757,9 @@ void MergeTreeData::dropAllData()
for (const auto & disk : getDisks())
{
if (disk->isBroken())
continue;
try
{
disk->removeRecursive(relative_data_path);
@ -1772,6 +1794,8 @@ void MergeTreeData::dropIfEmpty()
{
for (const auto & disk : getDisks())
{
if (disk->isBroken())
continue;
/// Non recursive, exception is thrown if there are more files.
disk->removeFileIfExists(fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME);
disk->removeDirectory(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
@ -5153,6 +5177,23 @@ Strings MergeTreeData::getDataPaths() const
}
void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const
{
if (data_part->volume && data_part->volume->getDisk()->isBroken())
{
auto disk = data_part->volume->getDisk();
auto parts = getDataParts();
LOG_WARNING(log, "Scanning parts to recover on broken disk {}.", disk->getName() + "@" + disk->getPath());
for (const auto & part : parts)
{
if (part->volume && part->volume->getDisk()->getName() == disk->getName())
broken_part_callback(part->name);
}
}
else
broken_part_callback(data_part->name);
}
MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const
{
bool prefixed = false;

View File

@ -612,6 +612,10 @@ public:
broken_part_callback(name);
}
/// Same as above but has the ability to check all other parts
/// which reside on the same disk of the suspicious part.
void reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const;
/// TODO (alesap) Duplicate method required for compatibility.
/// Must be removed.
static ASTPtr extractKeyExpressionList(const ASTPtr & node)

View File

@ -1527,10 +1527,19 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
size_t final_mark = part->index_granularity.hasFinalMark();
size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity;
MarkRanges index_ranges;
for (const auto & range : ranges)
{
MarkRange index_range(
range.begin / index_granularity,
(range.end + index_granularity - 1) / index_granularity);
index_ranges.push_back(index_range);
}
MergeTreeIndexReader reader(
index_helper, part,
index_marks_count,
ranges,
index_ranges,
mark_cache,
uncompressed_cache,
reader_settings);
@ -1541,11 +1550,9 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
/// this variable is stored to avoid reading the same granule twice.
MergeTreeIndexGranulePtr granule = nullptr;
size_t last_index_mark = 0;
for (const auto & range : ranges)
for (size_t i = 0; i < ranges.size(); ++i)
{
MarkRange index_range(
range.begin / index_granularity,
(range.end + index_granularity - 1) / index_granularity);
const MarkRange & index_range = index_ranges[i];
if (last_index_mark != index_range.begin || !granule)
reader.seek(index_range.begin);
@ -1558,8 +1565,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
granule = reader.read();
MarkRange data_range(
std::max(range.begin, index_mark * index_granularity),
std::min(range.end, (index_mark + 1) * index_granularity));
std::max(ranges[i].begin, index_mark * index_granularity),
std::min(ranges[i].end, (index_mark + 1) * index_granularity));
if (!condition->mayBeTrueOnGranule(granule))
{

View File

@ -113,7 +113,7 @@ bool MergeTreePartsMover::selectPartsForMove(
UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor();
UInt64 unreserved_space = disk->getUnreservedSpace();
if (unreserved_space < required_maximum_available_space)
if (unreserved_space < required_maximum_available_space && !disk->isBroken())
need_to_move.emplace(disk, required_maximum_available_space - unreserved_space);
}
}

View File

@ -118,7 +118,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
catch (...)
{
storage.reportBrokenPart(data_part->name);
storage.reportBrokenPart(data_part);
throw;
}
}
@ -175,7 +175,7 @@ size_t MergeTreeReaderCompact::readRows(
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
storage.reportBrokenPart(data_part);
/// Better diagnostics.
e.addMessage("(while reading column " + column_from_part.name + ")");
@ -183,7 +183,7 @@ size_t MergeTreeReaderCompact::readRows(
}
catch (...)
{
storage.reportBrokenPart(data_part->name);
storage.reportBrokenPart(data_part);
throw;
}
}
@ -277,8 +277,11 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
{
auto right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (!right_offset)
size_t right_offset = 0;
if (last_mark < data_part->getMarksCount()) /// Otherwise read until the end of file
right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (right_offset == 0)
{
/// If already reading till the end of file.
if (last_right_offset && *last_right_offset == 0)

View File

@ -42,7 +42,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
{
size_t left_mark = mark_range.begin;
size_t right_mark = mark_range.end;
auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark);
auto [_, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark);
max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes);
sum_mark_range_bytes += mark_range_bytes;
@ -106,42 +106,88 @@ MergeTreeReaderStream::MergeTreeReaderStream(
}
std::pair<size_t, size_t> MergeTreeReaderStream::getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark)
std::pair<size_t, size_t> MergeTreeReaderStream::getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark_non_included)
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// If the end of range is inside the block, we will need to read it too.
size_t result_right_mark = right_mark;
if (right_mark < marks_count && marks_loader.getMark(right_mark).offset_in_decompressed_block > 0)
/// Special case, can happen in Collapsing/Replacing engines
if (marks_count == 0)
return std::make_pair(0, 0);
assert(left_mark < marks_count);
assert(right_mark_non_included <= marks_count);
assert(left_mark <= right_mark_non_included);
size_t result_right_offset;
if (0 < right_mark_non_included && right_mark_non_included < marks_count)
{
auto indices = collections::range(right_mark, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, [this](size_t i, size_t j)
auto right_mark = marks_loader.getMark(right_mark_non_included);
result_right_offset = right_mark.offset_in_compressed_file;
bool need_to_check_marks_from_the_right = false;
/// If the end of range is inside the block, we will need to read it too.
if (right_mark.offset_in_decompressed_block > 0)
{
return marks_loader.getMark(i).offset_in_compressed_file < marks_loader.getMark(j).offset_in_compressed_file;
});
need_to_check_marks_from_the_right = true;
}
else
{
size_t right_mark_included = right_mark_non_included - 1;
const MarkInCompressedFile & right_mark_included_in_file = marks_loader.getMark(right_mark_included);
result_right_mark = (it == indices.end() ? marks_count : *it);
/// Also, in LowCardinality dictionary several consecutive marks can point to
/// the same offset. So to get true bytes offset we have to get first
/// non-equal mark.
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]
if (right_mark_included_in_file.offset_in_compressed_file == result_right_offset)
need_to_check_marks_from_the_right = true;
}
/// Let's go to the right and find mark with bigger offset in compressed file
if (need_to_check_marks_from_the_right)
{
bool found_bigger_mark = false;
for (size_t i = right_mark_non_included + 1; i < marks_count; ++i)
{
const auto & candidate_mark = marks_loader.getMark(i);
if (result_right_offset < candidate_mark.offset_in_compressed_file)
{
result_right_offset = candidate_mark.offset_in_compressed_file;
found_bigger_mark = true;
break;
}
}
if (!found_bigger_mark)
{
/// If there are no marks after the end of range, just use file size
result_right_offset = file_size;
}
}
}
size_t right_offset;
size_t mark_range_bytes;
/// If there are no marks after the end of range, just use file size
if (result_right_mark >= marks_count
|| (result_right_mark + 1 == marks_count
&& marks_loader.getMark(result_right_mark).offset_in_compressed_file == marks_loader.getMark(right_mark).offset_in_compressed_file))
else if (right_mark_non_included == 0)
{
right_offset = file_size;
mark_range_bytes = right_offset - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0);
result_right_offset = marks_loader.getMark(right_mark_non_included).offset_in_compressed_file;
}
else
{
right_offset = marks_loader.getMark(result_right_mark).offset_in_compressed_file;
mark_range_bytes = right_offset - marks_loader.getMark(left_mark).offset_in_compressed_file;
result_right_offset = file_size;
}
return std::make_pair(right_offset, mark_range_bytes);
size_t mark_range_bytes = result_right_offset - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0);
return std::make_pair(result_right_offset, mark_range_bytes);
}
@ -197,7 +243,7 @@ void MergeTreeReaderStream::adjustForRange(MarkRange range)
* read from stream, but we must update last_right_offset only if it is bigger than
* the last one to avoid redundantly cancelling prefetches.
*/
auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(range.begin, range.end);
auto [right_offset, _] = getRightOffsetAndBytesRange(range.begin, range.end);
if (!right_offset)
{
if (last_right_offset && *last_right_offset == 0)

Some files were not shown because too many files have changed in this diff Show More